diff --git a/pytorch_lightning/accelerators/ddp2_accelerator.py b/pytorch_lightning/accelerators/ddp2_accelerator.py index db4ccbda01bf0..4341a3b30e2fb 100644 --- a/pytorch_lightning/accelerators/ddp2_accelerator.py +++ b/pytorch_lightning/accelerators/ddp2_accelerator.py @@ -11,22 +11,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License - import os import torch import torch.distributed as torch_distrib -from pytorch_lightning.utilities.exceptions import MisconfigurationException from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.core.step_result import Result from pytorch_lightning.distributed.dist import LightningDistributed from pytorch_lightning import _logger as log -from pytorch_lightning.accelerators.accelerator import Accelerator +from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp from pytorch_lightning.utilities import AMPType -from pytorch_lightning.utilities.distributed import rank_zero_only +from pytorch_lightning.utilities.distributed import rank_zero_only, sync_ddp_if_available from torch.nn.parallel import DistributedDataParallel -from typing import List, Optional +from typing import List, Optional, Union, Any try: from hydra.utils import to_absolute_path, get_original_cwd @@ -203,3 +201,9 @@ def configure_sync_batchnorm(self, model: LightningModule) -> LightningModule: model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model, process_group=None) return model + + def sync_tensor(self, + tensor: Union[torch.Tensor], + group: Optional[Any] = None, + reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor: + return sync_ddp_if_available(tensor, group, reduce_op) diff --git a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py index 4b04c17d505e5..fab87750eb4ed 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py @@ -11,21 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License -import os -from typing import Any, List, Optional, Union - -import torch -import torch.distributed as torch_distrib -import torch.distributed as dist -from torch.nn.parallel import DistributedDataParallel - -from pytorch_lightning import _logger as log -from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp -from pytorch_lightning.core.lightning import LightningModule -from pytorch_lightning.utilities import AMPType -from pytorch_lightning.utilities.distributed import rank_zero_only -from pytorch_lightning.utilities.distributed import sync_ddp_if_available -from pytorch_lightning.distributed.dist import LightningDistributed +from pytorch_lightning.accelerators.ddp_hpc_accelerator import DDPHPCAccelerator try: @@ -37,167 +23,15 @@ HYDRA_AVAILABLE = True -class DDPCPUHPCAccelerator(Accelerator): +class DDPCPUHPCAccelerator(DDPHPCAccelerator): def __init__(self, trainer, cluster_environment=None, ddp_plugin=None): super().__init__(trainer, cluster_environment, ddp_plugin) - self.task_idx = None - self._has_spawned_children = False - self.dist = LightningDistributed() self.nickname = 'ddp_cpu' - def setup(self, model): - self.trainer.model = model - self.task_idx = self.cluster_environment.local_rank() - - def train(self): - model = self.trainer.model - self.ddp_train(process_idx=self.task_idx, model=model) - - def set_world_ranks(self, process_idx): - self.trainer.local_rank = process_idx - self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx - self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes - def model_to_device(self, model, process_idx): model.cpu() def get_device_ids(self): device_ids = None return device_ids - - def training_step(self, args): - if self.trainer.amp_backend == AMPType.NATIVE: - with torch.cuda.amp.autocast(): - output = self.trainer.model(*args) - else: - output = self.trainer.model(*args) - return output - - def validation_step(self, args): - output = self.training_step(args) - return output - - def test_step(self, args): - output = self.training_step(args) - return output - - def barrier(self, name: Optional[str] = None): - if torch_distrib.is_initialized(): - torch_distrib.barrier() - - def early_stopping_should_stop(self, pl_module): - stop = torch.tensor(int(self.trainer.should_stop), device=pl_module.device) - dist.all_reduce(stop, op=dist.reduce_op.SUM) - dist.barrier() - should_stop = stop == self.trainer.world_size - return should_stop - - def broadcast(self, obj, src=0): - return self.dist.broadcast(obj) - - def ddp_train(self, process_idx, model): - """ - Entry point for ddp - - Args: - process_idx: - mp_queue: multiprocessing queue - model: - - Returns: - Dict with evaluation results - - """ - # determine which process we are and world size - self.set_world_ranks(process_idx) - - # toggle prog bar - if (self.trainer.node_rank != 0 or process_idx != 0) and self.trainer.progress_bar_callback is not None: - self.trainer.progress_bar_callback.disable() - - # set warning rank - rank_zero_only.rank = self.trainer.global_rank - - # set up server using proc 0's ip address - # try to init for 20 times at max in case ports are taken - # where to store ip_table - model.trainer = self.trainer - self.init_ddp_connection( - self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks - ) - - # call setup after the ddp process has connected - self.trainer.call_setup_hook(model) - - # on world_size=0 let everyone know training is starting - if self.trainer.is_global_zero and not torch.distributed.is_initialized(): - log.info('-' * 100) - log.info(f'distributed_backend={self.trainer.distributed_backend} (TORCH_ELASTIC)') - log.info(f'All DDP processes registered. Starting ddp with {self.trainer.world_size} processes') - log.info('-' * 100) - - # call sync_bn before .cuda(), configure_apex and configure_ddp - if self.trainer.sync_batchnorm: - model = self.configure_sync_batchnorm(model) - - # move the model to the correct device - self.model_to_device(model, process_idx) - - # CHOOSE OPTIMIZER - # allow for lr schedulers as well - self.setup_optimizers(model) - - # set model properties before going into wrapper - self.trainer.model_connector.copy_trainer_model_properties(model) - - # 16-bit - model = self.trainer.precision_connector.connect(model) - - # device ids change depending on the DDP setup - device_ids = self.get_device_ids() - - # allow user to configure ddp - model = self.configure_ddp(model, device_ids) - - # set up training routine - self.trainer.train_loop.setup_training(model) - - # train or test - results = self.train_or_test() - - # clean up memory - torch.cuda.empty_cache() - - return results - - def configure_ddp( - self, model: LightningModule, device_ids: List[int] - ) -> DistributedDataParallel: - model = self.ddp_plugin.configure_ddp(model, device_ids) - return model - - def configure_sync_batchnorm(self, model: LightningModule) -> LightningModule: - """ - Add global batchnorm for a model spread across multiple GPUs and nodes. - - Override to synchronize batchnorm between specific process groups instead - of the whole world or use a different sync_bn like `apex`'s version. - - Args: - model: pointer to current :class:`LightningModule`. - - Return: - LightningModule with batchnorm layers synchronized between process groups - """ - model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model, process_group=None) - - return model - - def sync_tensor(self, - tensor: Union[torch.Tensor], - group: Optional[Any] = None, - reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor: - return sync_ddp_if_available(tensor, group, reduce_op) diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index 0e0822d5747d8..23d7778c1fff7 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -136,7 +136,7 @@ def ddp_train(self, process_idx, model): # on world_size=0 let everyone know training is starting if self.trainer.is_global_zero and not torch.distributed.is_initialized(): log.info('-' * 100) - log.info(f'distributed_backend={self.trainer.distributed_backend} (on SLURM)') + log.info(f'distributed_backend={self.trainer.distributed_backend}') log.info(f'All DDP processes registered. Starting ddp with {self.trainer.world_size} processes') log.info('-' * 100)