From aa24a5e5856a9abc9e61c67c23e550f4f3f373f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Sat, 30 Jan 2021 13:13:23 +0100 Subject: [PATCH 01/17] ddp plugins Co-authored-by: Justus Schock <12886177+justusschock@users.noreply.github.com> --- .../plugins/training_type/ddp.py | 252 ++++++++++++++++++ .../plugins/training_type/ddp2.py | 41 +++ .../plugins/training_type/ddp_spawn.py | 214 +++++++++++++++ 3 files changed, 507 insertions(+) create mode 100644 pytorch_lightning/plugins/training_type/ddp.py create mode 100644 pytorch_lightning/plugins/training_type/ddp2.py create mode 100644 pytorch_lightning/plugins/training_type/ddp_spawn.py diff --git a/pytorch_lightning/plugins/training_type/ddp.py b/pytorch_lightning/plugins/training_type/ddp.py new file mode 100644 index 0000000000000..06c0a5ce5f03b --- /dev/null +++ b/pytorch_lightning/plugins/training_type/ddp.py @@ -0,0 +1,252 @@ +import os +import subprocess +import sys +from time import sleep +from typing import Any, Dict, Optional, Union + +import numpy as np +import torch +import torch.distributed as torch_distrib + +from pytorch_lightning import _logger as log +from pytorch_lightning.plugins .training_type.parallel import ParallelPlugin +from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment +from pytorch_lightning.distributed import LightningDistributed +from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel, unwrap_lightning_module +from pytorch_lightning.utilities import _HYDRA_AVAILABLE +from pytorch_lightning.utilities.distributed import find_free_network_port, rank_zero_only, sync_ddp_if_available +from pytorch_lightning.utilities.exceptions import MisconfigurationException +from pytorch_lightning.utilities.seed import seed_everything + +if _HYDRA_AVAILABLE: + from hydra.utils import to_absolute_path, get_original_cwd + from hydra.core.hydra_config import HydraConfig + +if torch.distributed.is_available(): + from torch.distributed import ReduceOp +else: + + class ReduceOp: + SUM = None + + +class DDPPlugin(ParallelPlugin): + + distributed_backend = "ddp" + + def __init__( + self, + parallel_devices, + num_nodes=1, + cluster_environment: ClusterEnvironment = None, + sync_batchnorm=False, + **kwargs: Dict[str, Any], + ) -> None: + super().__init__(parallel_devices=parallel_devices, cluster_environment=cluster_environment) + self.interactive_ddp_procs = [] + self.num_nodes = num_nodes + self.sync_batchnorm = sync_batchnorm + self.dist = LightningDistributed() + self._ddp_kwargs = kwargs + self._has_spawned_children = False + self.task_idx = None + self.node_rank = 0 + self.num_processes = len(parallel_devices) + + @property + def root_device(self): + return self.parallel_devices[self.local_rank] + + @property + def lightning_module(self): + # the model may not be wrapped with DistributedDataParallel if calling this too early + return unwrap_lightning_module(self._model) + + @property + def distributed_sampler_kwargs(self): + distributed_sampler_kwargs = dict(num_replicas=(self.num_nodes * self.num_processes), rank=self.global_rank) + return distributed_sampler_kwargs + + def setup(self, model): + self._model = model + + # start the other scripts + # TODO: make sure this works, in torchelastic we should not launch child processes! + if os.environ.get("PL_IN_DDP_SUBPROCESS", "0") != "1": + self._call_children_scripts() + + # set the task idx + self.task_idx = self.cluster_environment.local_rank() + + def _call_children_scripts(self): + + # bookkeeping of spawned processes + assert self.global_rank == 0 + self._check_can_spawn_children() + self._has_spawned_children = True + + # DDP Environment variables + os.environ["MASTER_ADDR"] = os.environ.get("MASTER_ADDR", "127.0.0.1") + os.environ["MASTER_PORT"] = os.environ.get("MASTER_PORT", str(find_free_network_port())) + + # allow the user to pass the node rank + node_rank = "0" + node_rank = os.environ.get("NODE_RANK", node_rank) + node_rank = os.environ.get("GROUP_RANK", node_rank) + os.environ["NODE_RANK"] = node_rank + os.environ["LOCAL_RANK"] = "0" + + # when user is using hydra find the absolute path + path_lib = os.path.abspath if not _HYDRA_AVAILABLE else to_absolute_path + + # pull out the commands used to run the script and resolve the abs file path + command = sys.argv + try: + full_path = path_lib(command[0]) + except Exception as e: + full_path = os.path.abspath(command[0]) + + command[0] = full_path + # use the same python interpreter and actually running + command = [sys.executable] + command + + # the visible devices tell us how many GPUs we want to use. + # when the trainer script was called the device has already been scoped by the time + # code reaches this point. so, to call the scripts, we need to leave cuda visible devices alone + # but forward the GPUs selected via environment variables + if self.parallel_devices is None: + raise MisconfigurationException("you selected (distribute_backend = ddp) but did not set Trainer(gpus=?)") + + os.environ["PL_TRAINER_GPUS"] = ",".join([str(device.index) for device in self.parallel_devices]) + os.environ["PL_IN_DDP_SUBPROCESS"] = "1" + + if self.lightning_module.logger is not None: + os.environ["PL_EXP_VERSION"] = str(self.lightning_module.logger.version) + + num_gpus = len(self.parallel_devices) + os.environ["WORLD_SIZE"] = f"{num_gpus * self.num_nodes}" + + self.interactive_ddp_procs = [] + + for local_rank in range(1, self.num_processes): + env_copy = os.environ.copy() + env_copy["LOCAL_RANK"] = f"{local_rank}" + + # remove env var if global seed not set + if os.environ.get("PL_GLOBAL_SEED") is None and "PL_GLOBAL_SEED" in env_copy: + del env_copy["PL_GLOBAL_SEED"] + + # start process + # if hydra is available and initialized, make sure to set the cwd correctly + cwd: Optional[str] = None + if _HYDRA_AVAILABLE: + if HydraConfig.initialized(): + cwd = get_original_cwd() + proc = subprocess.Popen(command, env=env_copy, cwd=cwd) + self.interactive_ddp_procs.append(proc) + + # starting all processes at once can cause issues + # with dataloaders delay between 1-10 seconds + delay = np.random.uniform(1, 5, 1)[0] + sleep(delay) + + def _check_can_spawn_children(self): + if self._has_spawned_children: + raise RuntimeError( + "You tried to run `.fit` or `.test` multiple times in the same script." + " This is not supported in DDP mode, switch to `distributed_backend='ddp_spawn'` instead." + ) + + def set_world_ranks(self): + self.local_rank = self.task_idx + self.node_rank = self.cluster_environment.node_rank() + self.global_rank = self.node_rank * self.num_processes + self.local_rank + self.world_size = self.num_nodes * self.num_processes + + def configure_ddp(self): + # if unset, default `find_unused_parameters` `True` + self._ddp_kwargs["find_unused_parameters"] = self._ddp_kwargs.get("find_unused_parameters", True) + self._model = LightningDistributedDataParallel( + self.model, + device_ids=self.determine_ddp_device_ids(), + **self._ddp_kwargs, + ) + + def determine_ddp_device_ids(self): + if self.root_device.type == "cpu": + return None + return [self.root_device.index] + + def init_ddp_connection(self, global_rank: int, world_size: int) -> None: + # TODO: From where to get cluster environment? + os.environ["MASTER_ADDR"] = str(self.cluster_environment.master_address()) + os.environ["MASTER_PORT"] = str(self.cluster_environment.master_port()) + os.environ["WORLD_SIZE"] = str(self.cluster_environment.world_size()) + torch_backend = "nccl" if self.on_gpu else "gloo" + + if not torch.distributed.is_initialized(): + log.info(f"initializing ddp: GLOBAL_RANK: {global_rank}, MEMBER: {global_rank + 1}/{world_size}") + torch_distrib.init_process_group(torch_backend, rank=global_rank, world_size=world_size) + + def pre_training(self): + # TODO: check if needed + seed = os.environ.get("PL_GLOBAL_SEED") + if seed is not None: + seed_everything(int(seed)) + + # determine which process we are and world size + self.set_world_ranks() + + # set warning rank + rank_zero_only.rank = self.global_rank + + # set up server using proc 0's ip address + # try to init for 20 times at max in case ports are taken + # where to store ip_table + self.init_ddp_connection(self.global_rank, self.world_size) + + # TODO: we moved it to the trainer.fit after calling pre_training + # ... need to double check that it is the correct place + # self.trainer.call_setup_hook(self.model) + + # on world_size=0 let everyone know training is starting + if self.is_global_zero and not torch.distributed.is_initialized(): + log.info("-" * 100) + log.info(f"distributed_backend={self.distributed_backend}") + log.info(f"All DDP processes registered. Starting ddp with {self.world_size} processes") + log.info("-" * 100) + + # set the ranks and devices + self.dist.rank = self.global_rank + self.dist.device = self.root_device + + if self.sync_batchnorm: + self.model = self.configure_sync_batchnorm(self.model) + + # move the model to the correct device + self.model_to_device() + + self.configure_ddp() + + self.barrier() + + def post_training(self): + if "WORLD_SIZE" in os.environ: + del os.environ["WORLD_SIZE"] + + def barrier(self, *args, **kwargs): + if torch_distrib.is_initialized(): + torch_distrib.barrier() + + def broadcast(self, obj: object, src: int = 0) -> object: + return self.dist.broadcast(obj) + + def model_to_device(self): + if self.root_device.type == "cuda": + torch.cuda.set_device(self.root_device) + self.model.to(self.root_device) + + def reduce(self, output, group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = None): + if isinstance(output, torch.Tensor): + output = sync_ddp_if_available(output, group, reduce_op) + return output diff --git a/pytorch_lightning/plugins/training_type/ddp2.py b/pytorch_lightning/plugins/training_type/ddp2.py new file mode 100644 index 0000000000000..c693a004a39e0 --- /dev/null +++ b/pytorch_lightning/plugins/training_type/ddp2.py @@ -0,0 +1,41 @@ +import torch + +from pytorch_lightning.plugins .training_type.ddp import DDPPlugin +from pytorch_lightning.core.step_result import Result + + +class DDP2Plugin(DDPPlugin): + + def setup(self, model): + self._model = model + # set the task idx + self.task_idx = self.cluster_environment.local_rank() + # the difference to DDP is that we don't call children processes here + + def reduce(self, output, *args, **kwargs): + if isinstance(output, Result): + output.dp_reduce() + + elif isinstance(output, torch.Tensor): + output = output.mean() + + return output + + @property + def root_device(self): + return self.parallel_devices[0] + + def model_to_device(self): + # no need to do anything when model is wrapped in torch.nn.DataParallel + pass + + @property + def distributed_sampler_kwargs(self): + distributed_sampler_kwargs = dict(num_replicas=self.num_nodes, rank=self.global_rank) + return distributed_sampler_kwargs + + def set_world_ranks(self): + self.local_rank = self.task_idx + self.node_rank = self.cluster_environment.node_rank() + self.global_rank = self.node_rank + self.world_size = self.num_nodes diff --git a/pytorch_lightning/plugins/training_type/ddp_spawn.py b/pytorch_lightning/plugins/training_type/ddp_spawn.py new file mode 100644 index 0000000000000..95371b48356b6 --- /dev/null +++ b/pytorch_lightning/plugins/training_type/ddp_spawn.py @@ -0,0 +1,214 @@ +import os +import re +from typing import Any, Dict, Optional, Union + +import torch +import torch.distributed as torch_distrib +import torch.multiprocessing as mp + +from pytorch_lightning import _logger as log +from pytorch_lightning.plugins .training_type.parallel import ParallelPlugin +from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment +from pytorch_lightning.distributed.dist import LightningDistributed +from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel, unwrap_lightning_module +from pytorch_lightning.utilities.cloud_io import atomic_save, load as pl_load +from pytorch_lightning.utilities.distributed import find_free_network_port, rank_zero_only +from pytorch_lightning.utilities.distributed import sync_ddp_if_available, rank_zero_warn +from pytorch_lightning.utilities.seed import seed_everything + +if torch.distributed.is_available(): + from torch.distributed import ReduceOp +else: + + class ReduceOp: + SUM = None + + +class DDPSpawnPlugin(ParallelPlugin): + + distributed_backend = "ddp_spawn" + + def __init__( + self, + parallel_devices, + num_nodes=1, + cluster_environment: ClusterEnvironment = None, + sync_batchnorm: bool = False, + **kwargs: Dict[str, Any], + ): + super().__init__(parallel_devices=parallel_devices, cluster_environment=cluster_environment) + self.num_nodes = num_nodes + self.sync_batchnorm = sync_batchnorm + self._ddp_kwargs = kwargs + self.dist = LightningDistributed() + self.num_processes = len(parallel_devices) + self.node_rank = 0 + self.mp_queue = None + + @property + def root_device(self): + return self.parallel_devices[self.local_rank] + + @property + def lightning_module(self): + # the model may not be wrapped with DistributedDataParallel if calling this too early + return unwrap_lightning_module(self._model) + + @property + def distributed_sampler_kwargs(self): + distributed_sampler_kwargs = dict(num_replicas=(self.num_nodes * self.num_processes), rank=self.global_rank) + return distributed_sampler_kwargs + + def setup(self, model): + self._model = model + + os.environ["MASTER_PORT"] = os.environ.get("MASTER_PORT", str(find_free_network_port())) + + # pass in a state q + smp = mp.get_context("spawn") + self.mp_queue = smp.SimpleQueue() + + def set_world_ranks(self, process_idx): + self.local_rank = process_idx + self.node_rank = self.cluster_environment.node_rank() + self.global_rank = self.node_rank * self.num_processes + self.local_rank + self.world_size = self.num_nodes * self.num_processes + + def start_training(self, trainer): + mp.spawn(self.new_process, nprocs=self.num_processes, args=(trainer,)) + # reset optimizers, since main process is never used for training and thus does not have a valid optim state + trainer.optimizers = [] + + def start_testing(self, trainer): + mp.spawn(self.new_process, nprocs=self.num_processes, args=(trainer,)) + + def new_process(self, process_idx, trainer): + # TODO: check if needed + seed = os.environ.get("PL_GLOBAL_SEED") + if seed is not None: + seed_everything(int(seed)) + + self.set_world_ranks(process_idx) + + # set warning rank + rank_zero_only.rank = self.global_rank + + # set up server using proc 0's ip address + # try to init for 20 times at max in case ports are taken + # where to store ip_table + self.init_ddp_connection(self.global_rank, self.world_size) + + # TODO: we moved it to the trainer.fit after calling pre_training + # ... need to double check that it is the correct place + # self.trainer.call_setup_hook(self.model) + + # on world_size=0 let everyone know training is starting + if self.is_global_zero and not torch.distributed.is_initialized(): + log.info("-" * 100) + log.info(f"distributed_backend={self.distributed_backend}") + log.info(f"All DDP processes registered. Starting ddp with {self.world_size} processes") + log.info("-" * 100) + + # set the ranks and devices + self.dist.rank = self.global_rank + self.dist.device = self.root_device + + if self.sync_batchnorm: + self.model = self.configure_sync_batchnorm(self.model) + + # move the model to the correct device + self.model_to_device() + + self.configure_ddp() + + self.barrier() + + if trainer.testing: + results = trainer.run_test() + else: + results = trainer.train() + + # persist info in ddp_spawn + self.transfer_distrib_spawn_state_on_fit_end(results) + + def post_training(self): + # restore main state with best weights + best_path = self.mp_queue.get() + last_path = self.mp_queue.get() + self._results = self.mp_queue.get() + + # recover the weights of the processes trained in the children + self.__recover_child_process_weights(best_path, last_path) + + def configure_ddp(self): + # if unset, default `find_unused_parameters` `True` + self._ddp_kwargs["find_unused_parameters"] = self._ddp_kwargs.get("find_unused_parameters", True) + self.model = LightningDistributedDataParallel( + self.model, + device_ids=self.determine_ddp_device_ids(), + **self._ddp_kwargs, + ) + + def init_ddp_connection(self, global_rank: int, world_size: int) -> None: + # TODO: this code is duplicated in DDP and DDPSpawn, make this a function + os.environ["MASTER_ADDR"] = str(self.cluster_environment.master_address()) + os.environ["MASTER_PORT"] = str(self.cluster_environment.master_port()) + os.environ["WORLD_SIZE"] = str(self.cluster_environment.world_size()) + torch_backend = "nccl" if self.on_gpu else "gloo" + + if not torch.distributed.is_initialized(): + log.info(f"initializing ddp: GLOBAL_RANK: {global_rank}, MEMBER: {global_rank + 1}/{world_size}") + torch_distrib.init_process_group(torch_backend, rank=global_rank, world_size=world_size) + + def determine_ddp_device_ids(self): + if self.root_device.type == "cpu": + return None + return [self.root_device.index] + + def transfer_distrib_spawn_state_on_fit_end(self, results): + # TODO: is there a better way than accessing callback through model -> trainer -> callback? + best_model_path = self.lightning_module.trainer.checkpoint_callback.best_model_path + + if self.global_rank == 0 and self.mp_queue is not None: + rank_zero_warn("cleaning up ddp environment...") + + # save the last weights + last_path = None + # TODO: is there a better way than accessing trainer through model -> trainer? + if not self.lightning_module.trainer.testing and best_model_path is not None and len(best_model_path) > 0: + last_path = re.sub(".ckpt", ".tmp_end.ckpt", best_model_path) + atomic_save(self.lightning_module.state_dict(), last_path) + + # todo, pass complete checkpoint as state dictionary + self.mp_queue.put(best_model_path) + self.mp_queue.put(last_path) + self.mp_queue.put(results) + + def __recover_child_process_weights(self, best_path, last_path): + # TODO: is there a better way than accessing callback through model -> trainer -> callback? + # transfer back the best path to the trainer + if self.lightning_module.trainer.checkpoint_callback: + self.lightning_module.trainer.checkpoint_callback.best_model_path = best_path + # todo, pass also best score + + # load last weights + if last_path is not None and not self.lightning_module.trainer.testing: + ckpt = pl_load(last_path, map_location=lambda storage, loc: storage) + self.lightning_module.load_state_dict(ckpt) + + def barrier(self, *args, **kwargs): + if torch_distrib.is_initialized(): + torch_distrib.barrier() + + def broadcast(self, obj: object, src: int = 0) -> object: + return self.dist.broadcast(obj) + + def model_to_device(self): + if self.root_device.type == "cuda": + torch.cuda.set_device(self.root_device) + self.model.to(self.root_device) + + def reduce(self, output, group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = None): + if isinstance(output, torch.Tensor): + output = sync_ddp_if_available(output, group, reduce_op) + return output From f4f84966ccaa950a2b25a16d8051c23f2e97e1c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Sat, 30 Jan 2021 13:50:47 +0100 Subject: [PATCH 02/17] fix import --- pytorch_lightning/plugins/training_type/ddp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytorch_lightning/plugins/training_type/ddp.py b/pytorch_lightning/plugins/training_type/ddp.py index 06c0a5ce5f03b..927d589603cc5 100644 --- a/pytorch_lightning/plugins/training_type/ddp.py +++ b/pytorch_lightning/plugins/training_type/ddp.py @@ -9,7 +9,7 @@ import torch.distributed as torch_distrib from pytorch_lightning import _logger as log -from pytorch_lightning.plugins .training_type.parallel import ParallelPlugin +from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment from pytorch_lightning.distributed import LightningDistributed from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel, unwrap_lightning_module From ccebccb301f73ad24b03b43d6d4d0be6577c934a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Sun, 31 Jan 2021 08:42:04 +0100 Subject: [PATCH 03/17] fix import --- pytorch_lightning/plugins/training_type/ddp_spawn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytorch_lightning/plugins/training_type/ddp_spawn.py b/pytorch_lightning/plugins/training_type/ddp_spawn.py index 95371b48356b6..9cb7750259198 100644 --- a/pytorch_lightning/plugins/training_type/ddp_spawn.py +++ b/pytorch_lightning/plugins/training_type/ddp_spawn.py @@ -7,7 +7,7 @@ import torch.multiprocessing as mp from pytorch_lightning import _logger as log -from pytorch_lightning.plugins .training_type.parallel import ParallelPlugin +from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment from pytorch_lightning.distributed.dist import LightningDistributed from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel, unwrap_lightning_module From bd613685a9fd7e91f7f827f06221ffda6261a9a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Sun, 31 Jan 2021 08:59:08 +0100 Subject: [PATCH 04/17] add parallel plugin --- .../plugins/training_type/parallel.py | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 pytorch_lightning/plugins/training_type/parallel.py diff --git a/pytorch_lightning/plugins/training_type/parallel.py b/pytorch_lightning/plugins/training_type/parallel.py new file mode 100644 index 0000000000000..c929ca11329ba --- /dev/null +++ b/pytorch_lightning/plugins/training_type/parallel.py @@ -0,0 +1,96 @@ +from abc import ABC, abstractmethod +from contextlib import contextmanager +from typing import List, Optional + +import torch + +from pytorch_lightning.plugins .training_type.training_type_plugin import TrainingTypePlugin +from pytorch_lightning.cluster_environments import ClusterEnvironment +from pytorch_lightning.core import LightningModule +from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel + +if torch.distributed.is_available(): + from torch.distributed import ReduceOp +else: + + class ReduceOp: + SUM = None + + +class ParallelPlugin(TrainingTypePlugin, ABC): + def __init__( + self, + parallel_devices: List[torch.device], + cluster_environment: Optional[ClusterEnvironment] = None, + ): + super().__init__() + self.parallel_devices = parallel_devices + self.local_rank = 0 + self.world_size = 1 + self.cluster_environment = cluster_environment + + @property + @abstractmethod + def root_device(self): + raise NotImplementedError + + @property + def on_gpu(self): + return self.root_device.type == "cuda" and torch.cuda.is_available() + + @abstractmethod + def setup(self, model): + raise NotImplementedError + + def connect(self, model, *args, **kwargs): + self.setup(model) + return self.model + + @property + def is_global_zero(self) -> bool: + return self.global_rank == 0 + + @property + def distributed_sampler_kwargs(self): + distributed_sampler_kwargs = dict( + num_replicas=len(self.parallel_devices), + rank=self.global_rank + ) + return distributed_sampler_kwargs + + def reduce_early_stopping_decision(self, should_stop: bool) -> bool: + should_stop = torch.tensor(int(should_stop), device=self.lightning_module.device) + should_stop = self.reduce(should_stop, reduce_op=ReduceOp.SUM) + should_stop = bool(should_stop == self.world_size) + return should_stop + + @staticmethod + def configure_sync_batchnorm(model: LightningModule) -> LightningModule: + """ + Add global batchnorm for a model spread across multiple GPUs and nodes. + + Override to synchronize batchnorm between specific process groups instead + of the whole world or use a different sync_bn like `apex`'s version. + + Args: + model: pointer to current :class:`LightningModule`. + + Return: + LightningModule with batchnorm layers synchronized between process groups + """ + model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model) + return model + + @contextmanager + def block_backward_sync(self): + """ + Blocks ddp sync gradients behaviour on backwards pass. + This is useful for skipping sync when accumulating gradients, reducing communication overhead + Returns: context manager with sync behaviour off + """ + if isinstance(self.model, LightningDistributedDataParallel): + yield self.model.no_sync() + else: + yield None + + \ No newline at end of file From a1e1b5ac1c7f7791c24e24d1c09f9ada022fdf76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Sun, 31 Jan 2021 09:05:14 +0100 Subject: [PATCH 05/17] imports --- pytorch_lightning/plugins/training_type/__init__.py | 6 +++++- pytorch_lightning/plugins/training_type/ddp.py | 6 ++++-- pytorch_lightning/plugins/training_type/ddp_spawn.py | 6 ++++-- pytorch_lightning/plugins/training_type/parallel.py | 2 +- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/pytorch_lightning/plugins/training_type/__init__.py b/pytorch_lightning/plugins/training_type/__init__.py index 329f6347b17c3..1c17d48fe17dd 100644 --- a/pytorch_lightning/plugins/training_type/__init__.py +++ b/pytorch_lightning/plugins/training_type/__init__.py @@ -1 +1,5 @@ -from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin +from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin # noqa: F401 +from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin # noqa: F401 +from pytorch_lightning.plugins.training_type.ddp import DDPPlugin # noqa: F401 +from pytorch_lightning.plugins.training_type.ddp2 import DDP2Plugin # noqa: F401 +from pytorch_lightning.plugins.training_type.ddp_spawn import DDPSpawnPlugin # noqa: F401 diff --git a/pytorch_lightning/plugins/training_type/ddp.py b/pytorch_lightning/plugins/training_type/ddp.py index 927d589603cc5..83ead27ad59bc 100644 --- a/pytorch_lightning/plugins/training_type/ddp.py +++ b/pytorch_lightning/plugins/training_type/ddp.py @@ -12,7 +12,7 @@ from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment from pytorch_lightning.distributed import LightningDistributed -from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel, unwrap_lightning_module +from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel from pytorch_lightning.utilities import _HYDRA_AVAILABLE from pytorch_lightning.utilities.distributed import find_free_network_port, rank_zero_only, sync_ddp_if_available from pytorch_lightning.utilities.exceptions import MisconfigurationException @@ -60,7 +60,9 @@ def root_device(self): @property def lightning_module(self): # the model may not be wrapped with DistributedDataParallel if calling this too early - return unwrap_lightning_module(self._model) + # fixme: uncomment when this class will actually be used + # return unwrap_lightning_module(self._model) + pass @property def distributed_sampler_kwargs(self): diff --git a/pytorch_lightning/plugins/training_type/ddp_spawn.py b/pytorch_lightning/plugins/training_type/ddp_spawn.py index 9cb7750259198..b3f52410daef4 100644 --- a/pytorch_lightning/plugins/training_type/ddp_spawn.py +++ b/pytorch_lightning/plugins/training_type/ddp_spawn.py @@ -10,7 +10,7 @@ from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment from pytorch_lightning.distributed.dist import LightningDistributed -from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel, unwrap_lightning_module +from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel from pytorch_lightning.utilities.cloud_io import atomic_save, load as pl_load from pytorch_lightning.utilities.distributed import find_free_network_port, rank_zero_only from pytorch_lightning.utilities.distributed import sync_ddp_if_available, rank_zero_warn @@ -52,7 +52,9 @@ def root_device(self): @property def lightning_module(self): # the model may not be wrapped with DistributedDataParallel if calling this too early - return unwrap_lightning_module(self._model) + # fixme: uncomment when this class will actually be used + # return unwrap_lightning_module(self._model) + pass @property def distributed_sampler_kwargs(self): diff --git a/pytorch_lightning/plugins/training_type/parallel.py b/pytorch_lightning/plugins/training_type/parallel.py index c929ca11329ba..58d485743a52c 100644 --- a/pytorch_lightning/plugins/training_type/parallel.py +++ b/pytorch_lightning/plugins/training_type/parallel.py @@ -4,7 +4,7 @@ import torch -from pytorch_lightning.plugins .training_type.training_type_plugin import TrainingTypePlugin +from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin from pytorch_lightning.cluster_environments import ClusterEnvironment from pytorch_lightning.core import LightningModule from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel From 05f5825b738187ced00ced5c98f161f91d70d07e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Sun, 31 Jan 2021 09:05:59 +0100 Subject: [PATCH 06/17] isort --- pytorch_lightning/plugins/training_type/__init__.py | 4 ++-- pytorch_lightning/plugins/training_type/ddp.py | 4 ++-- pytorch_lightning/plugins/training_type/ddp2.py | 2 +- .../plugins/training_type/ddp_spawn.py | 13 +++++++++---- pytorch_lightning/plugins/training_type/parallel.py | 2 +- 5 files changed, 15 insertions(+), 10 deletions(-) diff --git a/pytorch_lightning/plugins/training_type/__init__.py b/pytorch_lightning/plugins/training_type/__init__.py index 1c17d48fe17dd..0f814271e16c7 100644 --- a/pytorch_lightning/plugins/training_type/__init__.py +++ b/pytorch_lightning/plugins/training_type/__init__.py @@ -1,5 +1,5 @@ -from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin # noqa: F401 -from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin # noqa: F401 from pytorch_lightning.plugins.training_type.ddp import DDPPlugin # noqa: F401 from pytorch_lightning.plugins.training_type.ddp2 import DDP2Plugin # noqa: F401 from pytorch_lightning.plugins.training_type.ddp_spawn import DDPSpawnPlugin # noqa: F401 +from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin # noqa: F401 +from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin # noqa: F401 diff --git a/pytorch_lightning/plugins/training_type/ddp.py b/pytorch_lightning/plugins/training_type/ddp.py index 83ead27ad59bc..b1452fc1d2280 100644 --- a/pytorch_lightning/plugins/training_type/ddp.py +++ b/pytorch_lightning/plugins/training_type/ddp.py @@ -9,18 +9,18 @@ import torch.distributed as torch_distrib from pytorch_lightning import _logger as log -from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment from pytorch_lightning.distributed import LightningDistributed from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel +from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin from pytorch_lightning.utilities import _HYDRA_AVAILABLE from pytorch_lightning.utilities.distributed import find_free_network_port, rank_zero_only, sync_ddp_if_available from pytorch_lightning.utilities.exceptions import MisconfigurationException from pytorch_lightning.utilities.seed import seed_everything if _HYDRA_AVAILABLE: - from hydra.utils import to_absolute_path, get_original_cwd from hydra.core.hydra_config import HydraConfig + from hydra.utils import get_original_cwd, to_absolute_path if torch.distributed.is_available(): from torch.distributed import ReduceOp diff --git a/pytorch_lightning/plugins/training_type/ddp2.py b/pytorch_lightning/plugins/training_type/ddp2.py index c693a004a39e0..73c5a812d2d1d 100644 --- a/pytorch_lightning/plugins/training_type/ddp2.py +++ b/pytorch_lightning/plugins/training_type/ddp2.py @@ -1,7 +1,7 @@ import torch -from pytorch_lightning.plugins .training_type.ddp import DDPPlugin from pytorch_lightning.core.step_result import Result +from pytorch_lightning.plugins.training_type.ddp import DDPPlugin class DDP2Plugin(DDPPlugin): diff --git a/pytorch_lightning/plugins/training_type/ddp_spawn.py b/pytorch_lightning/plugins/training_type/ddp_spawn.py index b3f52410daef4..f4da9dbcfae55 100644 --- a/pytorch_lightning/plugins/training_type/ddp_spawn.py +++ b/pytorch_lightning/plugins/training_type/ddp_spawn.py @@ -7,13 +7,18 @@ import torch.multiprocessing as mp from pytorch_lightning import _logger as log -from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment from pytorch_lightning.distributed.dist import LightningDistributed from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel -from pytorch_lightning.utilities.cloud_io import atomic_save, load as pl_load -from pytorch_lightning.utilities.distributed import find_free_network_port, rank_zero_only -from pytorch_lightning.utilities.distributed import sync_ddp_if_available, rank_zero_warn +from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin +from pytorch_lightning.utilities.cloud_io import atomic_save +from pytorch_lightning.utilities.cloud_io import load as pl_load +from pytorch_lightning.utilities.distributed import ( + find_free_network_port, + rank_zero_only, + rank_zero_warn, + sync_ddp_if_available, +) from pytorch_lightning.utilities.seed import seed_everything if torch.distributed.is_available(): diff --git a/pytorch_lightning/plugins/training_type/parallel.py b/pytorch_lightning/plugins/training_type/parallel.py index 58d485743a52c..a4e5a78a1e27f 100644 --- a/pytorch_lightning/plugins/training_type/parallel.py +++ b/pytorch_lightning/plugins/training_type/parallel.py @@ -4,10 +4,10 @@ import torch -from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin from pytorch_lightning.cluster_environments import ClusterEnvironment from pytorch_lightning.core import LightningModule from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel +from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin if torch.distributed.is_available(): from torch.distributed import ReduceOp From 6092dad8d8fd556608ad0cf60db00ca75885ff1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Sun, 31 Jan 2021 09:13:06 +0100 Subject: [PATCH 07/17] update changelog --- CHANGELOG.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a041defcd0028..5773f9f4b004a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -107,9 +107,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Changed the default value for the `progress_bar_refresh_rate` Trainer argument in Google COLAB notebooks to 20 ([#5516](https://github.com/PyTorchLightning/pytorch-lightning/pull/5516)) -- Refactored Accelerators and Plugins ( - [#5715](https://github.com/PyTorchLightning/pytorch-lightning/pull/5715), - ) +- Refactored Accelerators and Plugins + * Added base classes for plugins ([#5715](https://github.com/PyTorchLightning/pytorch-lightning/pull/5715)) + * Added DDP-, DDPSpawn- and DDP2Plugin ([#5714](https://github.com/PyTorchLightning/pytorch-lightning/pull/5714)) ### Deprecated From 69031ae61b201ce521541473f735147cb55fa8a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Sun, 31 Jan 2021 09:19:30 +0100 Subject: [PATCH 08/17] imports --- pytorch_lightning/plugins/training_type/__init__.py | 5 ----- pytorch_lightning/plugins/training_type/parallel.py | 4 ++-- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/pytorch_lightning/plugins/training_type/__init__.py b/pytorch_lightning/plugins/training_type/__init__.py index 0f814271e16c7..e69de29bb2d1d 100644 --- a/pytorch_lightning/plugins/training_type/__init__.py +++ b/pytorch_lightning/plugins/training_type/__init__.py @@ -1,5 +0,0 @@ -from pytorch_lightning.plugins.training_type.ddp import DDPPlugin # noqa: F401 -from pytorch_lightning.plugins.training_type.ddp2 import DDP2Plugin # noqa: F401 -from pytorch_lightning.plugins.training_type.ddp_spawn import DDPSpawnPlugin # noqa: F401 -from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin # noqa: F401 -from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin # noqa: F401 diff --git a/pytorch_lightning/plugins/training_type/parallel.py b/pytorch_lightning/plugins/training_type/parallel.py index a4e5a78a1e27f..cc79f139c81ba 100644 --- a/pytorch_lightning/plugins/training_type/parallel.py +++ b/pytorch_lightning/plugins/training_type/parallel.py @@ -4,8 +4,8 @@ import torch -from pytorch_lightning.cluster_environments import ClusterEnvironment -from pytorch_lightning.core import LightningModule +from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment +from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin From 905814e08d0183cc10a13465d0b4167aa8dd3c1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Sun, 31 Jan 2021 09:20:02 +0100 Subject: [PATCH 09/17] import --- pytorch_lightning/plugins/training_type/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pytorch_lightning/plugins/training_type/__init__.py b/pytorch_lightning/plugins/training_type/__init__.py index e69de29bb2d1d..329f6347b17c3 100644 --- a/pytorch_lightning/plugins/training_type/__init__.py +++ b/pytorch_lightning/plugins/training_type/__init__.py @@ -0,0 +1 @@ +from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin From 0b2d1725affabe78ea8a93abf7b2cd50cf3afe9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Sun, 31 Jan 2021 09:43:19 +0100 Subject: [PATCH 10/17] add docs --- pytorch_lightning/plugins/training_type/ddp.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pytorch_lightning/plugins/training_type/ddp.py b/pytorch_lightning/plugins/training_type/ddp.py index b1452fc1d2280..37d596d044b45 100644 --- a/pytorch_lightning/plugins/training_type/ddp.py +++ b/pytorch_lightning/plugins/training_type/ddp.py @@ -31,6 +31,14 @@ class ReduceOp: class DDPPlugin(ParallelPlugin): + """ + Plugin for single-device multi-process training on one or multiple nodes. + + The master process in each node spawns N-1 child processes via :func:`subprocess.Popen`, + where N is the number of devices (e.g. GPU) per node. + It is very similar to how :mod:`torch.distributed.launch` launches processes. + + """ distributed_backend = "ddp" From 9c07321c97f4ac4644da7d3e3bb789e47ab4773f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Sun, 31 Jan 2021 09:45:18 +0100 Subject: [PATCH 11/17] yapf formatting --- pytorch_lightning/plugins/training_type/ddp_spawn.py | 4 ++-- pytorch_lightning/plugins/training_type/parallel.py | 8 ++------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/pytorch_lightning/plugins/training_type/ddp_spawn.py b/pytorch_lightning/plugins/training_type/ddp_spawn.py index f4da9dbcfae55..a67d287d0741f 100644 --- a/pytorch_lightning/plugins/training_type/ddp_spawn.py +++ b/pytorch_lightning/plugins/training_type/ddp_spawn.py @@ -82,12 +82,12 @@ def set_world_ranks(self, process_idx): self.world_size = self.num_nodes * self.num_processes def start_training(self, trainer): - mp.spawn(self.new_process, nprocs=self.num_processes, args=(trainer,)) + mp.spawn(self.new_process, nprocs=self.num_processes, args=(trainer, )) # reset optimizers, since main process is never used for training and thus does not have a valid optim state trainer.optimizers = [] def start_testing(self, trainer): - mp.spawn(self.new_process, nprocs=self.num_processes, args=(trainer,)) + mp.spawn(self.new_process, nprocs=self.num_processes, args=(trainer, )) def new_process(self, process_idx, trainer): # TODO: check if needed diff --git a/pytorch_lightning/plugins/training_type/parallel.py b/pytorch_lightning/plugins/training_type/parallel.py index cc79f139c81ba..e5c10e1ad3401 100644 --- a/pytorch_lightning/plugins/training_type/parallel.py +++ b/pytorch_lightning/plugins/training_type/parallel.py @@ -18,6 +18,7 @@ class ReduceOp: class ParallelPlugin(TrainingTypePlugin, ABC): + def __init__( self, parallel_devices: List[torch.device], @@ -52,10 +53,7 @@ def is_global_zero(self) -> bool: @property def distributed_sampler_kwargs(self): - distributed_sampler_kwargs = dict( - num_replicas=len(self.parallel_devices), - rank=self.global_rank - ) + distributed_sampler_kwargs = dict(num_replicas=len(self.parallel_devices), rank=self.global_rank) return distributed_sampler_kwargs def reduce_early_stopping_decision(self, should_stop: bool) -> bool: @@ -92,5 +90,3 @@ def block_backward_sync(self): yield self.model.no_sync() else: yield None - - \ No newline at end of file From 1b4e7a9b8fe58b6d2287b73e24f5d9a6a26a6c1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Sun, 31 Jan 2021 09:49:17 +0100 Subject: [PATCH 12/17] docs --- pytorch_lightning/plugins/training_type/ddp.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pytorch_lightning/plugins/training_type/ddp.py b/pytorch_lightning/plugins/training_type/ddp.py index 37d596d044b45..0050e9948b9b5 100644 --- a/pytorch_lightning/plugins/training_type/ddp.py +++ b/pytorch_lightning/plugins/training_type/ddp.py @@ -32,12 +32,11 @@ class ReduceOp: class DDPPlugin(ParallelPlugin): """ - Plugin for single-device multi-process training on one or multiple nodes. + Plugin for multi-process single-device training on one or multiple nodes. The master process in each node spawns N-1 child processes via :func:`subprocess.Popen`, where N is the number of devices (e.g. GPU) per node. It is very similar to how :mod:`torch.distributed.launch` launches processes. - """ distributed_backend = "ddp" From 19947c9c848d043597a38eff78b85812b9bd583a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Sun, 31 Jan 2021 10:01:53 +0100 Subject: [PATCH 13/17] ad ddp and horovod --- pytorch_lightning/plugins/training_type/dp.py | 46 ++++++ .../plugins/training_type/horovod.py | 152 ++++++++++++++++++ 2 files changed, 198 insertions(+) create mode 100644 pytorch_lightning/plugins/training_type/dp.py create mode 100644 pytorch_lightning/plugins/training_type/horovod.py diff --git a/pytorch_lightning/plugins/training_type/dp.py b/pytorch_lightning/plugins/training_type/dp.py new file mode 100644 index 0000000000000..935aa5c2189aa --- /dev/null +++ b/pytorch_lightning/plugins/training_type/dp.py @@ -0,0 +1,46 @@ +from typing import List + +import torch + +from pytorch_lightning.core.step_result import Result +from pytorch_lightning.overrides.data_parallel import LightningDataParallel +from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin + + +class DataParallelPlugin(ParallelPlugin): + + def __init__(self, parallel_devices: List[torch.device]): + super().__init__(parallel_devices=parallel_devices, cluster_environment=None) + + def setup(self, model): + self._model = LightningDataParallel(model, self.parallel_devices) + + def reduce(self, output, *args, **kwargs): + if isinstance(output, Result): + output.dp_reduce() + + elif isinstance(output, torch.Tensor): + output = output.mean() + + return output + + @property + def root_device(self): + return self.parallel_devices[0] + + @property + def lightning_module(self): + return self._model.module + + def model_to_device(self): + # no need to do anything when model is wrapped in torch.nn.DataParallel + pass + + def barrier(self, *args, **kwargs): + pass + + def broadcast(self, obj: object, src: int = 0) -> object: + return obj + + def reduce_early_stopping_decision(self, should_stop: bool) -> bool: + return should_stop diff --git a/pytorch_lightning/plugins/training_type/horovod.py b/pytorch_lightning/plugins/training_type/horovod.py new file mode 100644 index 0000000000000..7d1d003303f2f --- /dev/null +++ b/pytorch_lightning/plugins/training_type/horovod.py @@ -0,0 +1,152 @@ +from contextlib import ExitStack +from typing import Any, List, Optional, Union + +import torch +from torch.optim.lr_scheduler import _LRScheduler + +from pytorch_lightning.core.optimizer import LightningOptimizer +from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin +from pytorch_lightning.utilities import _HOROVOD_AVAILABLE +from pytorch_lightning.utilities.distributed import rank_zero_only + +if _HOROVOD_AVAILABLE: + import horovod.torch as hvd + +if torch.distributed.is_available(): + from torch.distributed import ReduceOp +else: + + class ReduceOp: + SUM = None + + +class HorovodPlugin(ParallelPlugin): + + def __init__(self, parallel_devices: List[torch.device]): + super().__init__(parallel_devices=parallel_devices, cluster_environment=None) + + @property + def root_device(self): + return self.parallel_devices[self.local_rank] + + @property + def distributed_sampler_kwargs(self): + distributed_sampler_kwargs = dict(num_replicas=hvd.size(), rank=hvd.rank()) + return distributed_sampler_kwargs + + def setup(self, model): + self._model = model + + self.global_rank = hvd.rank() + self.local_rank = hvd.local_rank() + rank_zero_only.rank = self.global_rank + + self.model_to_device() + + def pre_training(self): + + def _unpack_lightning_optimizer(opt): + return opt._optimizer if isinstance(opt, LightningOptimizer) else opt + + optimizers = self.lightning_module.trainer.optimizers + optimizers = [_unpack_lightning_optimizer(opt) for opt in optimizers] + + # Horovod: scale the learning rate by the number of workers to account for + # increased total batch size + for optimizer in optimizers: + for param_group in optimizer.param_groups: + param_group["lr"] *= hvd.size() + + # Horovod: adjust base LR used by schedulers to match scaled optimizer initial LR + lr_schedulers = self.lightning_module.trainer.lr_schedulers + for scheduler in lr_schedulers: + scheduler = scheduler["scheduler"] + if isinstance(scheduler, _LRScheduler): + scheduler.base_lrs = [lr * hvd.size() for lr in scheduler.base_lrs] + + # Horovod: broadcast parameters & optimizer state to ensure consistent initialization + hvd.broadcast_parameters(self.lightning_module.state_dict(), root_rank=0) + for optimizer in optimizers: + hvd.broadcast_optimizer_state(optimizer, root_rank=0) + + def _filter_named_parameters(model, optimizer): + opt_params = set([p for group in optimizer.param_groups for p in group.get("params", [])]) + return [(name, p) for name, p in model.named_parameters() if p in opt_params] + + # Horovod: wrap optimizers to perform gradient aggregation via allreduce + optimizers = [ + hvd.DistributedOptimizer( + optimizer, named_parameters=_filter_named_parameters(self.lightning_module, optimizer) + ) for optimizer in optimizers + ] + + optimizers = self.lightning_module.trainer.convert_to_lightning_optimizers(optimizers) + self.lightning_module.trainer.optimizers = optimizers + + def start_training(self, trainer): + with ExitStack() as stack: + for optimizer in trainer.optimizers: + # Synchronization will be performed explicitly following backward() + stack.enter_context(optimizer.skip_synchronize()) + + # set up training routine + self._results = trainer.train() + + # Make sure all workers have finished training before returning to the user + hvd.join() + + def start_testing(self, trainer): + with ExitStack() as stack: + # set up training routine + # self.trainer.train_loop.setup_training(self.trainer.model) + self._results = trainer.run_test() + + # Make sure all workers have finished training before returning to the user + hvd.join() + + def barrier(self, *args, **kwargs): + hvd.join() + + def broadcast(self, obj: object, src: int = 0) -> object: + obj = hvd.broadcast_object(obj, src) + return obj + + def model_to_device(self): + if self.on_gpu: + torch.cuda.set_device(self.root_device) + self.model.to(self.root_device) + + def reduce(self, output, group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = None): + if group is not None: + raise ValueError( + "Horovod does not support allreduce using a subcommunicator at this time. " + "Unset `group`." + ) + + if reduce_op is None or reduce_op == "sum": + reduce_op = hvd.Sum + elif isinstance(reduce_op, str) and reduce_op in ("avg", "mean"): + reduce_op = hvd.Average + else: + raise ValueError(f"unrecognized `reduce_op`: {reduce_op}") + + # sync all processes before reduction + hvd.join() + return hvd.allreduce(output, op=reduce_op) + + def gather_all_tensors(self, result: Union[torch.Tensor], group: Optional[Any] = None): + if group is not None: + raise ValueError( + "Horovod does not support allgather using a subcommunicator at this time. " + "Unset `group`." + ) + + if len(result.shape) == 0: + # Convert scalars to single dimension tensors + result = result.reshape(1) + + # sync and gather all + hvd.join() + gathered = hvd.allgather(result) + gathered_result = list(gathered.split(1, dim=0)) + return gathered_result From f208e4124b3295405a9b49cab0d7e173ea26d059 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Sun, 31 Jan 2021 10:03:09 +0100 Subject: [PATCH 14/17] changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5773f9f4b004a..0d1670c24d69e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -109,7 +109,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Refactored Accelerators and Plugins * Added base classes for plugins ([#5715](https://github.com/PyTorchLightning/pytorch-lightning/pull/5715)) - * Added DDP-, DDPSpawn- and DDP2Plugin ([#5714](https://github.com/PyTorchLightning/pytorch-lightning/pull/5714)) + * Added parallel plugins for DP, DDP, DDPSpawn, DDP2 and Horovod ([#5714](https://github.com/PyTorchLightning/pytorch-lightning/pull/5714)) ### Deprecated From 67d76b0ffcb330b640dcea7e4c42a33c06eb619d Mon Sep 17 00:00:00 2001 From: Jirka Borovec Date: Sun, 31 Jan 2021 13:06:58 +0100 Subject: [PATCH 15/17] . --- pytorch_lightning/plugins/training_type/ddp.py | 13 +++++++++++++ pytorch_lightning/plugins/training_type/ddp2.py | 13 +++++++++++++ .../plugins/training_type/ddp_spawn.py | 13 +++++++++++++ pytorch_lightning/plugins/training_type/dp.py | 13 +++++++++++++ pytorch_lightning/plugins/training_type/horovod.py | 13 +++++++++++++ pytorch_lightning/plugins/training_type/parallel.py | 13 +++++++++++++ 6 files changed, 78 insertions(+) diff --git a/pytorch_lightning/plugins/training_type/ddp.py b/pytorch_lightning/plugins/training_type/ddp.py index 0050e9948b9b5..49e43dfee5045 100644 --- a/pytorch_lightning/plugins/training_type/ddp.py +++ b/pytorch_lightning/plugins/training_type/ddp.py @@ -1,3 +1,16 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import os import subprocess import sys diff --git a/pytorch_lightning/plugins/training_type/ddp2.py b/pytorch_lightning/plugins/training_type/ddp2.py index 73c5a812d2d1d..a7c8477a40c2d 100644 --- a/pytorch_lightning/plugins/training_type/ddp2.py +++ b/pytorch_lightning/plugins/training_type/ddp2.py @@ -1,3 +1,16 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import torch from pytorch_lightning.core.step_result import Result diff --git a/pytorch_lightning/plugins/training_type/ddp_spawn.py b/pytorch_lightning/plugins/training_type/ddp_spawn.py index a67d287d0741f..aea4be1951bef 100644 --- a/pytorch_lightning/plugins/training_type/ddp_spawn.py +++ b/pytorch_lightning/plugins/training_type/ddp_spawn.py @@ -1,3 +1,16 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import os import re from typing import Any, Dict, Optional, Union diff --git a/pytorch_lightning/plugins/training_type/dp.py b/pytorch_lightning/plugins/training_type/dp.py index 935aa5c2189aa..ce33da87048cc 100644 --- a/pytorch_lightning/plugins/training_type/dp.py +++ b/pytorch_lightning/plugins/training_type/dp.py @@ -1,3 +1,16 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from typing import List import torch diff --git a/pytorch_lightning/plugins/training_type/horovod.py b/pytorch_lightning/plugins/training_type/horovod.py index 7d1d003303f2f..2005fb38c2e43 100644 --- a/pytorch_lightning/plugins/training_type/horovod.py +++ b/pytorch_lightning/plugins/training_type/horovod.py @@ -1,3 +1,16 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from contextlib import ExitStack from typing import Any, List, Optional, Union diff --git a/pytorch_lightning/plugins/training_type/parallel.py b/pytorch_lightning/plugins/training_type/parallel.py index e5c10e1ad3401..01851ca0a666a 100644 --- a/pytorch_lightning/plugins/training_type/parallel.py +++ b/pytorch_lightning/plugins/training_type/parallel.py @@ -1,3 +1,16 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from abc import ABC, abstractmethod from contextlib import contextmanager from typing import List, Optional From 5aad9ff4fa2ce873501d4b1c5f5560acccb1e6a7 Mon Sep 17 00:00:00 2001 From: Jirka Borovec Date: Sun, 31 Jan 2021 13:18:56 +0100 Subject: [PATCH 16/17] simple --- pytorch_lightning/plugins/training_type/ddp.py | 9 +-------- pytorch_lightning/plugins/training_type/ddp_spawn.py | 8 +------- pytorch_lightning/plugins/training_type/horovod.py | 9 +-------- pytorch_lightning/plugins/training_type/parallel.py | 8 +------- 4 files changed, 4 insertions(+), 30 deletions(-) diff --git a/pytorch_lightning/plugins/training_type/ddp.py b/pytorch_lightning/plugins/training_type/ddp.py index 49e43dfee5045..3896d779237e0 100644 --- a/pytorch_lightning/plugins/training_type/ddp.py +++ b/pytorch_lightning/plugins/training_type/ddp.py @@ -27,7 +27,7 @@ from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin from pytorch_lightning.utilities import _HYDRA_AVAILABLE -from pytorch_lightning.utilities.distributed import find_free_network_port, rank_zero_only, sync_ddp_if_available +from pytorch_lightning.utilities.distributed import ReduceOp, find_free_network_port, rank_zero_only, sync_ddp_if_available from pytorch_lightning.utilities.exceptions import MisconfigurationException from pytorch_lightning.utilities.seed import seed_everything @@ -35,13 +35,6 @@ from hydra.core.hydra_config import HydraConfig from hydra.utils import get_original_cwd, to_absolute_path -if torch.distributed.is_available(): - from torch.distributed import ReduceOp -else: - - class ReduceOp: - SUM = None - class DDPPlugin(ParallelPlugin): """ diff --git a/pytorch_lightning/plugins/training_type/ddp_spawn.py b/pytorch_lightning/plugins/training_type/ddp_spawn.py index aea4be1951bef..a50be00ec7f06 100644 --- a/pytorch_lightning/plugins/training_type/ddp_spawn.py +++ b/pytorch_lightning/plugins/training_type/ddp_spawn.py @@ -27,6 +27,7 @@ from pytorch_lightning.utilities.cloud_io import atomic_save from pytorch_lightning.utilities.cloud_io import load as pl_load from pytorch_lightning.utilities.distributed import ( + ReduceOp, find_free_network_port, rank_zero_only, rank_zero_warn, @@ -34,13 +35,6 @@ ) from pytorch_lightning.utilities.seed import seed_everything -if torch.distributed.is_available(): - from torch.distributed import ReduceOp -else: - - class ReduceOp: - SUM = None - class DDPSpawnPlugin(ParallelPlugin): diff --git a/pytorch_lightning/plugins/training_type/horovod.py b/pytorch_lightning/plugins/training_type/horovod.py index 2005fb38c2e43..c4ba668bef44e 100644 --- a/pytorch_lightning/plugins/training_type/horovod.py +++ b/pytorch_lightning/plugins/training_type/horovod.py @@ -20,18 +20,11 @@ from pytorch_lightning.core.optimizer import LightningOptimizer from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin from pytorch_lightning.utilities import _HOROVOD_AVAILABLE -from pytorch_lightning.utilities.distributed import rank_zero_only +from pytorch_lightning.utilities.distributed import ReduceOp, rank_zero_only if _HOROVOD_AVAILABLE: import horovod.torch as hvd -if torch.distributed.is_available(): - from torch.distributed import ReduceOp -else: - - class ReduceOp: - SUM = None - class HorovodPlugin(ParallelPlugin): diff --git a/pytorch_lightning/plugins/training_type/parallel.py b/pytorch_lightning/plugins/training_type/parallel.py index 01851ca0a666a..617c762799f29 100644 --- a/pytorch_lightning/plugins/training_type/parallel.py +++ b/pytorch_lightning/plugins/training_type/parallel.py @@ -21,13 +21,7 @@ from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel from pytorch_lightning.plugins.training_type.training_type_plugin import TrainingTypePlugin - -if torch.distributed.is_available(): - from torch.distributed import ReduceOp -else: - - class ReduceOp: - SUM = None +from pytorch_lightning.utilities.distributed import ReduceOp class ParallelPlugin(TrainingTypePlugin, ABC): From e4e6d470258776d0a2d67e6a642461e126794583 Mon Sep 17 00:00:00 2001 From: Jirka Borovec Date: Sun, 31 Jan 2021 13:19:59 +0100 Subject: [PATCH 17/17] flake8 --- pytorch_lightning/plugins/training_type/ddp.py | 7 ++++++- pytorch_lightning/plugins/training_type/ddp_spawn.py | 2 +- pytorch_lightning/plugins/training_type/horovod.py | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pytorch_lightning/plugins/training_type/ddp.py b/pytorch_lightning/plugins/training_type/ddp.py index 3896d779237e0..1128756780518 100644 --- a/pytorch_lightning/plugins/training_type/ddp.py +++ b/pytorch_lightning/plugins/training_type/ddp.py @@ -27,7 +27,12 @@ from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin from pytorch_lightning.utilities import _HYDRA_AVAILABLE -from pytorch_lightning.utilities.distributed import ReduceOp, find_free_network_port, rank_zero_only, sync_ddp_if_available +from pytorch_lightning.utilities.distributed import ( + find_free_network_port, + rank_zero_only, + ReduceOp, + sync_ddp_if_available, +) from pytorch_lightning.utilities.exceptions import MisconfigurationException from pytorch_lightning.utilities.seed import seed_everything diff --git a/pytorch_lightning/plugins/training_type/ddp_spawn.py b/pytorch_lightning/plugins/training_type/ddp_spawn.py index a50be00ec7f06..9745fd5dee9f5 100644 --- a/pytorch_lightning/plugins/training_type/ddp_spawn.py +++ b/pytorch_lightning/plugins/training_type/ddp_spawn.py @@ -27,10 +27,10 @@ from pytorch_lightning.utilities.cloud_io import atomic_save from pytorch_lightning.utilities.cloud_io import load as pl_load from pytorch_lightning.utilities.distributed import ( - ReduceOp, find_free_network_port, rank_zero_only, rank_zero_warn, + ReduceOp, sync_ddp_if_available, ) from pytorch_lightning.utilities.seed import seed_everything diff --git a/pytorch_lightning/plugins/training_type/horovod.py b/pytorch_lightning/plugins/training_type/horovod.py index c4ba668bef44e..a8bd0091eef6d 100644 --- a/pytorch_lightning/plugins/training_type/horovod.py +++ b/pytorch_lightning/plugins/training_type/horovod.py @@ -20,7 +20,7 @@ from pytorch_lightning.core.optimizer import LightningOptimizer from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin from pytorch_lightning.utilities import _HOROVOD_AVAILABLE -from pytorch_lightning.utilities.distributed import ReduceOp, rank_zero_only +from pytorch_lightning.utilities.distributed import rank_zero_only, ReduceOp if _HOROVOD_AVAILABLE: import horovod.torch as hvd