diff --git a/CHANGELOG.md b/CHANGELOG.md index a0a423d6968f9..011777002eef9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -272,6 +272,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Removed deprecated utils modules `model_utils`, `warning_utils`, `xla_device_utils` and partially `argparse_utils` ([#7503](https://github.com/PyTorchLightning/pytorch-lightning/pull/7503)) +- Removed `RPCPlugin` and `RPCSequentialPlugin`. If you were successfully using these plugins, please open a GitHub discussion about your use case ([#8101](https://github.com/PyTorchLightning/pytorch-lightning/pull/8101)) + + - Removed deprecated trainer attributes - `on_cpu`, `on_tpu`, `use_tpu`, `on_gpu`, `use_dp`, `use_ddp`, `use_ddp2`, `use_horovod`, `use_single_gpu` ([#7501](https://github.com/PyTorchLightning/pytorch-lightning/pull/7501)) diff --git a/docs/source/api_references.rst b/docs/source/api_references.rst index ff616f6e56f6c..3f9e2c2575cc2 100644 --- a/docs/source/api_references.rst +++ b/docs/source/api_references.rst @@ -89,8 +89,6 @@ Training Type Plugins DDPSpawnPlugin DeepSpeedPlugin HorovodPlugin - RPCPlugin - RPCSequentialPlugin SingleTPUPlugin TPUSpawnPlugin diff --git a/docs/source/extensions/plugins.rst b/docs/source/extensions/plugins.rst index 35e563715e037..436d40f660e7a 100644 --- a/docs/source/extensions/plugins.rst +++ b/docs/source/extensions/plugins.rst @@ -115,8 +115,6 @@ Training Type Plugins DDPSpawnPlugin DeepSpeedPlugin HorovodPlugin - RPCPlugin - RPCSequentialPlugin SingleTPUPlugin TPUSpawnPlugin diff --git a/pl_examples/basic_examples/conv_sequential_example.py b/pl_examples/basic_examples/conv_sequential_example.py deleted file mode 100644 index 9747c4a939340..0000000000000 --- a/pl_examples/basic_examples/conv_sequential_example.py +++ /dev/null @@ -1,226 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" - -Example script of running the experimental DDP Sequential Plugin. -This script splits a convolutional model onto multiple GPUs, whilst using the internal built in balancer -to balance across your GPUs. - -To run: -python conv_model_sequential_example.py --accelerator ddp --gpus 4 --max_epochs 1 --batch_size 256 --use_rpc_sequential -""" -import math -from argparse import ArgumentParser - -import torch -import torch.nn as nn -import torch.nn.functional as F -import torchvision -from torchmetrics.functional import accuracy - -import pytorch_lightning as pl -from pl_examples import cli_lightning_logo -from pytorch_lightning import Trainer -from pytorch_lightning.plugins import RPCSequentialPlugin -from pytorch_lightning.utilities import _BOLTS_AVAILABLE, _FAIRSCALE_PIPE_AVAILABLE - -if _BOLTS_AVAILABLE: - import pl_bolts - from pl_bolts.transforms.dataset_normalizations import cifar10_normalization - -##################### -# Modules # -##################### - - -class Flatten(nn.Module): - - def forward(self, x): - return x.view(x.size(0), -1) - - -############################### -# LightningModule # -############################### - - -class LitResnet(pl.LightningModule): - """ - >>> LitResnet() # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE - LitResnet( - (sequential_module): Sequential(...) - ) - """ - - def __init__(self, lr=0.05, batch_size=32, manual_optimization=False): - super().__init__() - - self.save_hyperparameters() - self.sequential_module = nn.Sequential( - # Conv Layer block 1 - nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1), - nn.BatchNorm2d(32), - nn.ReLU(inplace=False), - nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1), - nn.ReLU(inplace=False), - nn.MaxPool2d(kernel_size=2, stride=2), - - # Conv Layer block 2 - nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1), - nn.BatchNorm2d(128), - nn.ReLU(inplace=False), - nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1), - nn.ReLU(inplace=False), - nn.MaxPool2d(kernel_size=2, stride=2), - nn.Dropout2d(p=0.05), - - # Conv Layer block 3 - nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1), - nn.BatchNorm2d(256), - nn.ReLU(inplace=False), - nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1), - nn.ReLU(inplace=False), - nn.MaxPool2d(kernel_size=2, stride=2), - Flatten(), - nn.Dropout(p=0.1), - nn.Linear(4096, 1024), - nn.ReLU(inplace=False), - nn.Linear(1024, 512), - nn.ReLU(inplace=False), - nn.Dropout(p=0.1), - nn.Linear(512, 10) - ) - self._example_input_array = torch.randn((1, 3, 32, 32)) - - if manual_optimization: - self.automatic_optimization = False - self.training_step = self.training_step_manual - - def forward(self, x): - out = self.sequential_module(x) - return F.log_softmax(out, dim=-1) - - def training_step_manual(self, batch, batch_idx): - opt = self.optimizers() - - def closure(): - x, y = batch - logits = self.forward(x) - loss = F.nll_loss(logits, y) - self.manual_backward(loss, opt) - self.log('train_loss', loss, prog_bar=True) - - opt.step(closure=closure) - - def training_step(self, batch, batch_idx): - x, y = batch - logits = self.forward(x) - loss = F.nll_loss(logits, y) - self.log('Training Loss', loss) - return loss - - def _evaluate(self, batch, batch_idx, stage=None): - x, y = batch - out = self.forward(x) - logits = F.log_softmax(out, dim=-1) - loss = F.nll_loss(logits, y) - preds = torch.argmax(logits, dim=-1) - acc = accuracy(preds, y) - - if stage: - self.log(f'{stage}_loss', loss, prog_bar=True) - self.log(f'{stage}_acc', acc, prog_bar=True) - - return loss, acc - - def validation_step(self, batch, batch_idx): - return self._evaluate(batch, batch_idx, 'val')[0] - - def test_step(self, batch, batch_idx): - loss, acc = self._evaluate(batch, batch_idx, 'test') - self.log_dict({'test_loss': loss, 'test_acc': acc}) - - def configure_optimizers(self): - optimizer = torch.optim.SGD(self.parameters(), lr=self.hparams.lr, momentum=0.9, weight_decay=5e-4) - return { - 'optimizer': optimizer, - 'lr_scheduler': { - 'scheduler': torch.optim.lr_scheduler.OneCycleLR( - optimizer, - 0.1, - epochs=self.trainer.max_epochs, - steps_per_epoch=math.ceil(45000 / self.hparams.batch_size) - ), - 'interval': 'step', - } - } - - -################################# -# Instantiate Data Module # -################################# - - -def instantiate_datamodule(args): - train_transforms = torchvision.transforms.Compose([ - torchvision.transforms.RandomCrop(32, padding=4), - torchvision.transforms.RandomHorizontalFlip(), - torchvision.transforms.ToTensor(), - cifar10_normalization(), - ]) - - test_transforms = torchvision.transforms.Compose([ - torchvision.transforms.ToTensor(), - cifar10_normalization(), - ]) - - cifar10_dm = pl_bolts.datamodules.CIFAR10DataModule( - data_dir=args.data_dir, - batch_size=args.batch_size, - train_transforms=train_transforms, - test_transforms=test_transforms, - val_transforms=test_transforms, - ) - - return cifar10_dm - - -if __name__ == "__main__": - cli_lightning_logo() - - assert _BOLTS_AVAILABLE, "Bolts is required for this example, install it via `pip install lightning-bolts`" - assert _FAIRSCALE_PIPE_AVAILABLE, "FairScale and PyTorch 1.6 is required for this example." - - parser = ArgumentParser(description="Pipe Example") - parser.add_argument("--use_rpc_sequential", action="store_true") - parser.add_argument("--manual_optimization", action="store_true") - parser = Trainer.add_argparse_args(parser) - parser = pl_bolts.datamodules.CIFAR10DataModule.add_argparse_args(parser) - args = parser.parse_args() - - cifar10_dm = instantiate_datamodule(args) - - plugins = None - if args.use_rpc_sequential: - plugins = RPCSequentialPlugin() - - model = LitResnet(batch_size=args.batch_size, manual_optimization=args.manual_optimization) - - trainer = pl.Trainer.from_argparse_args(args, plugins=[plugins] if plugins else None) - trainer.fit(model, cifar10_dm) - trainer.test(model, datamodule=cifar10_dm) - - if trainer.accelerator.rpc_enabled: - # Called at the end of trainer to ensure all processes are killed - trainer.training_type_plugin.exit_rpc_process() diff --git a/pytorch_lightning/accelerators/accelerator.py b/pytorch_lightning/accelerators/accelerator.py index d9dacd92dc4d7..0abc888ab0395 100644 --- a/pytorch_lightning/accelerators/accelerator.py +++ b/pytorch_lightning/accelerators/accelerator.py @@ -394,10 +394,6 @@ def precision(self) -> Union[str, int]: def scaler(self) -> Optional['GradScaler']: return getattr(self.precision_plugin, 'scaler', None) - @property - def rpc_enabled(self) -> bool: - return self.training_type_plugin.rpc_enabled - def optimizer_state(self, optimizer: Optimizer) -> Dict[str, Tensor]: """ Returns state of an optimizer. Allows for syncing/collating optimizer state from processes in custom diff --git a/pytorch_lightning/callbacks/model_checkpoint.py b/pytorch_lightning/callbacks/model_checkpoint.py index 0d1132f191652..336f97c78d3df 100644 --- a/pytorch_lightning/callbacks/model_checkpoint.py +++ b/pytorch_lightning/callbacks/model_checkpoint.py @@ -483,15 +483,6 @@ def _del_model(self, trainer: 'pl.Trainer', filepath: str) -> None: log.debug(f"Removed checkpoint: {filepath}") def _save_model(self, trainer: 'pl.Trainer', filepath: str) -> None: - if trainer.training_type_plugin.rpc_enabled: - # RPCPlugin manages saving all model states - # TODO: the rpc plugin should wrap trainer.save_checkpoint - # instead of us having to do it here manually - trainer.training_type_plugin.rpc_save_model(trainer, self._do_save, filepath) - else: - self._do_save(trainer, filepath) - - def _do_save(self, trainer: 'pl.Trainer', filepath: str) -> None: # in debugging, track when we save checkpoints trainer.dev_debugger.track_checkpointing_history(filepath) diff --git a/pytorch_lightning/plugins/__init__.py b/pytorch_lightning/plugins/__init__.py index cc95671ebf2cc..f620ee28afe9a 100644 --- a/pytorch_lightning/plugins/__init__.py +++ b/pytorch_lightning/plugins/__init__.py @@ -23,8 +23,6 @@ from pytorch_lightning.plugins.training_type.horovod import HorovodPlugin # noqa: F401 from pytorch_lightning.plugins.training_type.ipu import IPUPlugin # noqa: F401 from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin # noqa: F401 -from pytorch_lightning.plugins.training_type.rpc import RPCPlugin # noqa: F401 -from pytorch_lightning.plugins.training_type.rpc_sequential import RPCSequentialPlugin # noqa: F401 from pytorch_lightning.plugins.training_type.sharded import DDPShardedPlugin # noqa: F401 from pytorch_lightning.plugins.training_type.sharded_spawn import DDPSpawnShardedPlugin # noqa: F401 from pytorch_lightning.plugins.training_type.single_device import SingleDevicePlugin # noqa: F401 @@ -53,8 +51,6 @@ "SingleTPUPlugin", "TPUHalfPrecisionPlugin", "TPUSpawnPlugin", - "RPCPlugin", - "RPCSequentialPlugin", "TrainingTypePlugin", "ParallelPlugin", "Plugin", diff --git a/pytorch_lightning/plugins/training_type/__init__.py b/pytorch_lightning/plugins/training_type/__init__.py index 3cb43e44f5565..6a56d68e17db9 100644 --- a/pytorch_lightning/plugins/training_type/__init__.py +++ b/pytorch_lightning/plugins/training_type/__init__.py @@ -6,8 +6,6 @@ from pytorch_lightning.plugins.training_type.fully_sharded import DDPFullyShardedPlugin # noqa: F401 from pytorch_lightning.plugins.training_type.horovod import HorovodPlugin # noqa: F401 from pytorch_lightning.plugins.training_type.parallel import ParallelPlugin # noqa: F401 -from pytorch_lightning.plugins.training_type.rpc import RPCPlugin # noqa: F401 -from pytorch_lightning.plugins.training_type.rpc_sequential import RPCSequentialPlugin # noqa: F401 from pytorch_lightning.plugins.training_type.sharded import DDPShardedPlugin # noqa: F401 from pytorch_lightning.plugins.training_type.sharded_spawn import DDPSpawnShardedPlugin # noqa: F401 from pytorch_lightning.plugins.training_type.single_device import SingleDevicePlugin # noqa: F401 diff --git a/pytorch_lightning/plugins/training_type/rpc.py b/pytorch_lightning/plugins/training_type/rpc.py deleted file mode 100644 index 3e0f57daef001..0000000000000 --- a/pytorch_lightning/plugins/training_type/rpc.py +++ /dev/null @@ -1,85 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os -from contextlib import suppress -from typing import Callable, List, Optional - -import torch - -from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment -from pytorch_lightning.plugins.training_type.ddp import DDPPlugin -from pytorch_lightning.utilities import _RPC_AVAILABLE - -DEFAULT_RPC_TIMEOUT_SEC = 60. -if _RPC_AVAILABLE: - from torch.distributed import rpc - - with suppress(ModuleNotFoundError, ImportError): - from torch.distributed.rpc.constants import DEFAULT_RPC_TIMEOUT_SEC - - -class RPCPlugin(DDPPlugin): - """ - Backbone for RPC Plugins built on top of DDP. - RPC introduces different communication behaviour than DDP. Unlike DDP, processes potentially are not - required to run the same code as the main process. - This leads to edge cases where logic needs to be re-defined. This class contains special cases - that need to be addressed when using RPC communication when building custom RPC Plugins. - """ - - def __init__( - self, - rpc_timeout_sec: float = DEFAULT_RPC_TIMEOUT_SEC, - parallel_devices: Optional[List[torch.device]] = None, - num_nodes: Optional[int] = None, - cluster_environment: Optional[ClusterEnvironment] = None, - sync_batchnorm: Optional[bool] = None, - **kwargs - ): - self.rpc_timeout_sec = rpc_timeout_sec - self._is_rpc_initialized = False - super().__init__( - parallel_devices=parallel_devices, - num_nodes=num_nodes, - cluster_environment=cluster_environment, - sync_batchnorm=sync_batchnorm, - **kwargs - ) - - def init_rpc_connection(self, global_rank: int, world_size: int) -> None: - os.environ['MASTER_PORT'] = os.getenv('RPC_MASTER_PORT', '15000') - rpc.init_rpc(f"worker{global_rank}", rank=global_rank, world_size=world_size) - rpc._set_rpc_timeout(self.rpc_timeout_sec) - self._is_rpc_initialized = True - - def rpc_save_model(self, trainer, save_model_fn: Callable, filepath: str) -> None: - """ - Override to save model to disk. - This is required as the main process will be required to handle aggregating model states from RPC processes. - - Args: - trainer: The trainer object. - save_model_fn: The saving function to save final model. - filepath: The filepath to save the model to. - """ - raise NotImplementedError - - def exit_rpc_process(self): - if self._is_rpc_initialized: - torch.distributed.rpc.shutdown() - self._is_rpc_initialized = False - - @property - def rpc_enabled(self) -> bool: - return True diff --git a/pytorch_lightning/plugins/training_type/rpc_sequential.py b/pytorch_lightning/plugins/training_type/rpc_sequential.py deleted file mode 100644 index a75839cbdb714..0000000000000 --- a/pytorch_lightning/plugins/training_type/rpc_sequential.py +++ /dev/null @@ -1,408 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License -import logging -import os -from typing import Callable, List, Optional - -import torch -import torch.distributed as torch_distrib -from torch import nn -from torch.nn.parallel import DistributedDataParallel -from torch.optim import Optimizer - -from pytorch_lightning.core.lightning import LightningModule -from pytorch_lightning.overrides.distributed import LightningDistributedModule -from pytorch_lightning.plugins.training_type.rpc import DEFAULT_RPC_TIMEOUT_SEC, RPCPlugin -from pytorch_lightning.trainer.states import TrainerFn -from pytorch_lightning.utilities import _FAIRSCALE_PIPE_AVAILABLE, rank_zero_only -from pytorch_lightning.utilities.exceptions import MisconfigurationException - -if _FAIRSCALE_PIPE_AVAILABLE: - import fairscale.nn.model_parallel as mpu - from fairscale.nn import PipeRPCWrapper - from fairscale.nn.pipe import balance as pipe_balance - from fairscale.nn.pipe import rpc as rpc_pipe - from fairscale.nn.pipe.pipeline import PipelineStyle - -log = logging.getLogger(__name__) - - -class RPCSequentialPlugin(RPCPlugin): - - def __init__( - self, - balance: Optional[List[int]] = None, - microbatches: int = 8, - checkpoint: str = 'except_last', - balance_mode: str = "balance_by_size", - pipelined_backward: Optional[bool] = True, - rpc_timeout_sec: float = DEFAULT_RPC_TIMEOUT_SEC, - **kwargs - ): - """ - Provides sequential model parallelism for :class:`nn.Sequential ` module. - If the module requires lots of memory, Pipe can be used to reduce this by leveraging multiple GPUs. - - .. _RPCSequentialPlugin: https://arxiv.org/abs/1811.06965 - - Pipeline parallelism comes with with checkpointing to reduce peak - memory required to train while minimizing device under-utilization. - This is turned on by default and can be turned off via the checkpoint argument. - - You should determine the balance when defining the plugin, - or you can pass an example input array via the LightningModule to infer a balance. - The module will be partitioned into multiple devices according to the given balance. You may also rely on - your own heuristics to find your own optimal configuration. - - Args: - balance: The balance of the model, i.e [2, 2] (two layers on each GPU). - If not provided assumes user provides an input example array to find a balance on all GPUs. - - microbatches: Allows for parallelization to reduce device utilization - by splitting the batch into further smaller batches. - - checkpoint: Enables gradient checkpointing. ['always', 'except_last', 'never'] - - balance_mode: Type of balance heuristic to use if balance to be inferred. - - - 'balance_by_size': checks memory usage of each layer and determines balance - - - 'balance_by_time': checks time of each layer and determines balance - - pipelined_backward: if True, call torch.autograd.backward once per microbatch on the - - backward pass (instead of once for the whole batch). This works - around a potential deadlock in pytorch when using tensor parallelism - at the same time. Defaults to `True` if - `get_model_parallel_world_size() > 1` - """ - self._check_pipe_available() - super().__init__(rpc_timeout_sec=rpc_timeout_sec, **kwargs) - - self.balance = balance - - self.microbatches = microbatches - self.checkpoint = checkpoint - self.balance_mode = balance_mode - self.pipelined_backward = pipelined_backward - self._main_rpc_process = True - - def init_ddp_connection( - self, - global_rank: Optional[int] = None, - world_size: Optional[int] = None, - ) -> None: - if self.lightning_module.trainer.amp_backend is not None: - raise MisconfigurationException( - '`RPCSequentialPlugin` is currently not supported in Automatic Mixed Precision' - ) - - if self._skip_init_connections(): - return - - global_rank = global_rank if global_rank is not None else self.cluster_environment.global_rank() - world_size = world_size if world_size is not None else self.cluster_environment.world_size() - super().init_ddp_connection(global_rank, world_size) - super().init_rpc_connection(global_rank=global_rank, world_size=world_size) - model = self.lightning_module - self.gpus_per_model = self._infer_check_num_gpus() - self.init_model_parallel_groups() - self.set_main_rpc_process() - - self._check_sequential_model_exists(model) - - # check if user given balance is valid - if self.balance is not None: - self._assert_valid_model_balance() - - if self.main_rpc_process: - if self.balance is None: - self._infer_model_balance() - self.init_pipe_module() - else: - self.handle_transferred_pipe_module() - self.exit_rpc_process() - - def _infer_model_balance(self): - log.info(f'Inferring model balance using {self.balance_mode} mode') - model = self.lightning_module - if model.example_input_array is None: - raise MisconfigurationException( - 'Please set example_input_array to your model, so we can infer the right model balance for you' - ) - balance_func = getattr(pipe_balance, self.balance_mode) - self.balance = balance_func(self.gpus_per_model, model.sequential_module, model.example_input_array) - self._sync_balance_to_all_parallel_groups() - - log.info(f'The following model balance {self.balance.tolist()} was inferred using {self.balance_mode} mode') - - def _sync_balance_to_all_parallel_groups(self, main_rank=0): - """ - Ensures that we sync the balance to all main processes, so that the balance is the same per replica. - Args: - main_rank: The rank with the balance we'd like to replicate. - """ - self.balance = torch.tensor(self.balance, dtype=torch.int, device='cuda') - # Ensure we sync to all processes within the main data parallel group - # We use the data parallel group as all main processes are found within the same group - torch_distrib.broadcast(self.balance, src=main_rank, group=mpu.get_data_parallel_group()) - self.balance = self.balance.cpu() - - def _check_sequential_model_exists(self, model): - if not hasattr(model, "sequential_module") or not isinstance(model.sequential_module, nn.Sequential): - raise MisconfigurationException( - 'Could not find a PipeLightningModule within the model. ' - 'Did you set your sequential model as the `sequential_module` attribute of your model?' - ) - - def _find_and_init_pipe_module(self, model): - if hasattr(model, "sequential_module") and isinstance(model.sequential_module, LightningPipeModule): - # model has been wrapped already - return - elif hasattr(model, "sequential_module") and isinstance(model.sequential_module, nn.Sequential): - # try to wrap model for the user - model.sequential_module = LightningPipeModule( - model.sequential_module, - balance=self.balance, - microbatches=self.microbatches, - checkpoint=self.checkpoint, - ) - # Update references for workers to access correct lightning functions when calling RPC - model.sequential_module.trainer = model.trainer - model.sequential_module.configure_optimizers = model.configure_optimizers - - # Update references for main process to access correct lightning functions when calling RPC - model.sequential_module.module.model.trainer = model.trainer - model.sequential_module.module.model.configure_optimizers = model.configure_optimizers - - self.model = model - - else: - raise MisconfigurationException( - 'Could not find a PipeLightningModule within the model. ' - 'Did you defined set your sequential model as a `sequential_module` attribute of your model?' - ) - - def _assert_valid_model_balance(self): - model = self.lightning_module - if sum(self.balance) != len(model.sequential_module): - raise MisconfigurationException( - f'The provided balance sum: {sum(self.balance)} does not' - f' match your Sequential length: {len(model.sequential_module)}' - ) - - def _skip_init_connections(self): - """ - Skip initialization if torch is already initialized and we're in testing. - Returns: Whether to skip initialization - - """ - return torch_distrib.is_initialized() and self.lightning_module.trainer.state.fn != TrainerFn.FITTING - - def init_model_parallel_groups(self): - num_model_parallel = 1 # TODO currently no support for vertical model parallel - mpu.initialize_model_parallel(model_parallel_size_=num_model_parallel, pipeline_length=self.gpus_per_model) - - def _infer_check_num_gpus(self): - """ - Infer the number of GPUs per model. - - Returns: The appropriate balance for the model - """ - if isinstance(self.balance, list): - if len(self.balance) != (self.world_size / self.num_nodes): - raise MisconfigurationException( - "Pipe currently only supports splitting the module onto all available GPUs" - ) - # User has defined a balance for his model - return len(self.balance) - # Assume that the user wants to balance his model on all GPUs - return self.world_size - - def handle_transferred_pipe_module(self) -> None: - if self.lightning_module.trainer.state.fn == TrainerFn.FITTING: - torch_distrib.barrier() # Ensure we await main process initialization - # Add trainer/configure_optimizers to the pipe model for access in all worker processes - rpc_pipe.PipeModel.trainer = self.lightning_module.trainer - del rpc_pipe.PipeModel.trainer.model.sequential_module - rpc_pipe.PipeModel.trainer.model.sequential_module = rpc_pipe.PipeModel - rpc_pipe.PipeModel.configure_optimizers = self.lightning_module.configure_optimizers - - def init_pipe_module(self) -> None: - # Create pipe_module - model = self.lightning_module - self._find_and_init_pipe_module(model) - if self.lightning_module.trainer.state.fn == TrainerFn.FITTING: - torch_distrib.barrier() # Ensure we join main process initialization - model.sequential_module.foreach_worker(register_optimizers, include_self=True) - - # TODO: Move this to the connector - - def pre_backward(self, closure_loss: torch.Tensor, should_accumulate: bool, optimizer: Optimizer, opt_idx: int): - """Run before precision plugin executes backward""" - - def configure_ddp(self): - if self.main_rpc_process: - self.pre_configure_ddp() - - self._model = DistributedDataParallel( - LightningDistributedModule(self.model), - device_ids=self.determine_ddp_device_ids(), - process_group=mpu.get_data_parallel_group(), - **self._ddp_kwargs, - ) - # Plugin handle backwards across processes. Currently not supported for DDP + pipe parallel - self._model.require_backward_grad_sync = False - - @rank_zero_only - def rpc_save_model(self, trainer, save_model_fn: Callable, filepath: str) -> None: - model = self.lightning_module - if not hasattr(model.sequential_module, "foreach_worker"): - return - current_layers = model.sequential_module - model.sequential_module.foreach_worker( - save_layers_on_all_rank_zero_workers, {"gpus_per_model": self.gpus_per_model}, include_self=True - ) - model.sequential_module = load_sequential_from_saved_layers(self.gpus_per_model) - save_model_fn(trainer, filepath) - model.sequential_module = current_layers - - def worker_optimizer_step(self, model: LightningModule, opt_idx: int, *args, **kwargs) -> None: - model.sequential_module.foreach_worker( - run_optimizer, { - "opt_idx": opt_idx, - "args": args, - "kwargs": kwargs - }, include_self=False - ) - - @property - def distributed_sampler_kwargs(self): - return dict( - num_replicas=mpu.get_data_parallel_world_size(), - rank=mpu.get_data_parallel_rank(), - ) - - @property - def data_parallel_group(self): - return mpu.get_data_parallel_group() - - def set_main_rpc_process(self): - self.main_rpc_process = torch_distrib.get_rank(group=mpu.get_pipeline_parallel_group()) == 0 - - @property - def main_rpc_process(self) -> bool: - return self._main_rpc_process - - @main_rpc_process.setter - def main_rpc_process(self, is_main_process): - self._main_rpc_process = is_main_process - - def barrier(self, name: Optional[str] = None) -> None: - if torch_distrib.is_initialized() and self.main_rpc_process: - torch_distrib.barrier(group=self.data_parallel_group) - - def _check_pipe_available(self): - if not _FAIRSCALE_PIPE_AVAILABLE: - raise MisconfigurationException( - 'PipeRPCPlugin requires FairScale and currently is only supported on PyTorch 1.6.' - ) - - def post_optimizer_step(self, optimizer: Optimizer, optimizer_idx: int, **kwargs) -> None: - """Hook to do something after each optimizer step.""" - if self.rpc_enabled and self.main_rpc_process: - # Initialize optimizer step on main process - self.worker_optimizer_step(model=self.lightning_module, opt_idx=optimizer_idx, **kwargs) - - def post_training_step(self): - if self.main_rpc_process: - super().post_training_step() - - def start_training(self, trainer) -> None: - if self.main_rpc_process: - super().start_training(trainer) - - def start_evaluating(self, trainer) -> None: - if self.main_rpc_process: - super().start_evaluating(trainer) - - -class LightningPipeModule(nn.Module): - """ - This class wraps Fairscale Pipe and PipeRCPWrapper class. - """ - - def __init__(self, module: nn.Sequential, balance: List[int], microbatches: int = 8, checkpoint='never'): - super().__init__() - self.module = module - self.balance = balance - self.microbatches = microbatches - self.checkpoint = checkpoint - self._init_pipe() - - def _init_pipe(self): - device = torch.device("cuda", torch_distrib.get_rank()) - - self.module = PipeRPCWrapper( - module=self.module, - balance=self.balance, - chunks=self.microbatches, - style=PipelineStyle.MultiProcess, - input_device=device, - worker_map=self.get_worker_map(), - checkpoint=self.checkpoint, - ) - - def foreach_worker(self, *args, **kwargs): - self.module.foreach_worker(*args, **kwargs) - - def forward(self, *args, **kwargs): - return self.module(*args, **kwargs) - - def get_worker_map(self): - # TODO, is this correct with multinodes? We also assume "worker" is the same as defined in the RPCPlugin - return {rank: f"worker{rank}" for rank in range(torch_distrib.get_world_size())} - - -def register_optimizers(ctx, model): - optimizers, lr_schedulers, optimizer_frequencies = model.trainer.init_optimizers(model) - model.trainer.optimizers = optimizers - model.trainer.lr_schedulers = lr_schedulers - model.trainer.optimizer_frequencies = optimizer_frequencies - - -def run_optimizer(ctx, model): - trainer = model.trainer - opt_idx = ctx["opt_idx"] - optimizer = trainer.optimizers[opt_idx] - optimizer.step(*ctx["args"], **ctx["kwargs"]) - - -def save_layers_on_all_rank_zero_workers(ctx, model): - gpus_per_model = ctx["gpus_per_model"] - rank = torch_distrib.get_rank() - if rank in range(gpus_per_model): - seq = list(model.children())[0] - torch.save(seq, f"seq_{rank}.pt") - - -def load_sequential_from_saved_layers(gpus_per_model): - partial_seqs = [torch.load(f"seq_{rank}.pt", map_location='cpu') for rank in range(gpus_per_model)] - seq = nn.Sequential() - for p_seq in partial_seqs: - for name, child in p_seq.named_children(): - seq.add_module(name, child) - # delete tmp files - [os.remove(f"seq_{rank}.pt") for rank in range(gpus_per_model)] - return seq diff --git a/pytorch_lightning/plugins/training_type/training_type_plugin.py b/pytorch_lightning/plugins/training_type/training_type_plugin.py index 4c825a93da290..e7ca73bc9f40d 100644 --- a/pytorch_lightning/plugins/training_type/training_type_plugin.py +++ b/pytorch_lightning/plugins/training_type/training_type_plugin.py @@ -145,10 +145,6 @@ def results(self) -> Optional[Union[_EVALUATE_OUTPUT, _PREDICT_OUTPUT]]: """ return self._results - @property - def rpc_enabled(self) -> bool: - return False - def load_checkpoint_file(self, checkpoint_path: Union[str, Path]) -> Dict[str, Any]: return pl_load(checkpoint_path, map_location=(lambda storage, loc: storage)) diff --git a/pytorch_lightning/utilities/__init__.py b/pytorch_lightning/utilities/__init__.py index c2e727d314396..536b36ceb81b0 100644 --- a/pytorch_lightning/utilities/__init__.py +++ b/pytorch_lightning/utilities/__init__.py @@ -32,7 +32,6 @@ _FAIRSCALE_AVAILABLE, _FAIRSCALE_FULLY_SHARDED_AVAILABLE, _FAIRSCALE_OSS_FP16_BROADCAST_AVAILABLE, - _FAIRSCALE_PIPE_AVAILABLE, _GROUP_AVAILABLE, _HOROVOD_AVAILABLE, _HYDRA_AVAILABLE, @@ -43,7 +42,6 @@ _NATIVE_AMP_AVAILABLE, _OMEGACONF_AVAILABLE, _POPTORCH_AVAILABLE, - _RPC_AVAILABLE, _TORCH_GREATER_EQUAL_1_5, _TORCH_GREATER_EQUAL_1_6, _TORCH_GREATER_EQUAL_1_7, diff --git a/pytorch_lightning/utilities/enums.py b/pytorch_lightning/utilities/enums.py index 3cb4d24d126fa..98f2770d03cf9 100644 --- a/pytorch_lightning/utilities/enums.py +++ b/pytorch_lightning/utilities/enums.py @@ -79,7 +79,6 @@ def is_interactive_compatible(self) -> bool: HOROVOD = 'horovod' DDP_SHARDED = 'ddp_sharded' DDP_SHARDED_SPAWN = 'ddp_sharded_spawn' - RPC_SEQUENTIAL_PLUGIN = 'rpc_sequential' DDP_FULLY_SHARDED = "ddp_fully_sharded" diff --git a/pytorch_lightning/utilities/imports.py b/pytorch_lightning/utilities/imports.py index 2a51b01404821..3125a2d38f15e 100644 --- a/pytorch_lightning/utilities/imports.py +++ b/pytorch_lightning/utilities/imports.py @@ -75,7 +75,6 @@ def _compare_version(package: str, op, version) -> bool: _BOLTS_AVAILABLE = _module_available('pl_bolts') _DEEPSPEED_AVAILABLE = not _IS_WINDOWS and _module_available('deepspeed') _FAIRSCALE_AVAILABLE = _TORCH_GREATER_EQUAL_1_6 and not _IS_WINDOWS and _module_available('fairscale.nn') -_FAIRSCALE_PIPE_AVAILABLE = _FAIRSCALE_AVAILABLE and _compare_version("fairscale", operator.le, "0.1.3") _FAIRSCALE_OSS_FP16_BROADCAST_AVAILABLE = _FAIRSCALE_AVAILABLE and _compare_version("fairscale", operator.ge, "0.3.3") _FAIRSCALE_FULLY_SHARDED_AVAILABLE = _FAIRSCALE_AVAILABLE and _compare_version("fairscale", operator.ge, "0.3.4") _GROUP_AVAILABLE = not _IS_WINDOWS and _module_available('torch.distributed.group') @@ -86,7 +85,6 @@ def _compare_version(package: str, op, version) -> bool: _NATIVE_AMP_AVAILABLE = _module_available("torch.cuda.amp") and hasattr(torch.cuda.amp, "autocast") _OMEGACONF_AVAILABLE = _module_available("omegaconf") _POPTORCH_AVAILABLE = _module_available('poptorch') -_RPC_AVAILABLE = not _IS_WINDOWS and _module_available('torch.distributed.rpc') _TORCH_QUANTIZE_AVAILABLE = bool([eg for eg in torch.backends.quantized.supported_engines if eg != 'none']) _TORCHTEXT_AVAILABLE = _module_available("torchtext") _TORCHVISION_AVAILABLE = _module_available('torchvision') diff --git a/tests/helpers/runif.py b/tests/helpers/runif.py index 737ddd68dff17..e4a1d20f72872 100644 --- a/tests/helpers/runif.py +++ b/tests/helpers/runif.py @@ -25,11 +25,9 @@ _DEEPSPEED_AVAILABLE, _FAIRSCALE_AVAILABLE, _FAIRSCALE_FULLY_SHARDED_AVAILABLE, - _FAIRSCALE_PIPE_AVAILABLE, _HOROVOD_AVAILABLE, _IPU_AVAILABLE, _NATIVE_AMP_AVAILABLE, - _RPC_AVAILABLE, _TORCH_QUANTIZE_AVAILABLE, _TPU_AVAILABLE, ) @@ -69,9 +67,7 @@ def __new__( horovod_nccl: bool = False, skip_windows: bool = False, special: bool = False, - rpc: bool = False, fairscale: bool = False, - fairscale_pipe: bool = False, fairscale_fully_sharded: bool = False, deepspeed: bool = False, **kwargs @@ -92,9 +88,7 @@ def __new__( horovod_nccl: if Horovod is installed with NCCL support skip_windows: skip test for Windows platform (typically fo some limited torch functionality) special: running in special mode, outside pytest suit - rpc: requires Remote Procedure Call (RPC) fairscale: if `fairscale` module is required to run the test - fairscale_pipe: if `fairscale` with pipe module is required to run the test fairscale_fully_sharded: if `fairscale` fully sharded module is required to run the test deepspeed: if `deepspeed` module is required to run the test kwargs: native pytest.mark.skipif keyword arguments @@ -159,18 +153,10 @@ def __new__( conditions.append(env_flag != '1') reasons.append("Special execution") - if rpc: - conditions.append(not _RPC_AVAILABLE) - reasons.append("RPC") - if fairscale: conditions.append(not _FAIRSCALE_AVAILABLE) reasons.append("Fairscale") - if fairscale_pipe: - conditions.append(not _FAIRSCALE_PIPE_AVAILABLE) - reasons.append("Fairscale Pipe") - if fairscale_fully_sharded: conditions.append(not _FAIRSCALE_FULLY_SHARDED_AVAILABLE) reasons.append("Fairscale Fully Sharded") diff --git a/tests/plugins/test_cluster_integration.py b/tests/plugins/test_cluster_integration.py index f9ca8c23d34d9..9f5eba43cf5a0 100644 --- a/tests/plugins/test_cluster_integration.py +++ b/tests/plugins/test_cluster_integration.py @@ -18,7 +18,7 @@ import torch from pytorch_lightning import Trainer -from pytorch_lightning.plugins import DDP2Plugin, DDPPlugin, DDPShardedPlugin, DeepSpeedPlugin, RPCSequentialPlugin +from pytorch_lightning.plugins import DDP2Plugin, DDPPlugin, DDPShardedPlugin, DeepSpeedPlugin from pytorch_lightning.plugins.environments import LightningEnvironment, SLURMEnvironment, TorchElasticEnvironment from pytorch_lightning.utilities import rank_zero_only from tests.helpers.runif import RunIf @@ -66,7 +66,6 @@ def environment_combinations(): DDPShardedPlugin, DDP2Plugin, pytest.param(DeepSpeedPlugin, marks=RunIf(deepspeed=True)), - pytest.param(RPCSequentialPlugin, marks=RunIf(fairscale_pipe=True)), ], ) def test_ranks_available_manual_plugin_selection(plugin_cls): diff --git a/tests/plugins/test_rpc_plugin.py b/tests/plugins/test_rpc_plugin.py deleted file mode 100644 index 7abf9fcbd5039..0000000000000 --- a/tests/plugins/test_rpc_plugin.py +++ /dev/null @@ -1,89 +0,0 @@ -import os -from typing import Optional -from unittest import mock - -import pytest - -from pytorch_lightning import Trainer -from pytorch_lightning.callbacks import Callback -from pytorch_lightning.plugins.training_type.rpc_sequential import RPCPlugin -from tests.helpers.boring_model import BoringModel -from tests.helpers.runif import RunIf - - -@mock.patch.dict( - os.environ, - { - "CUDA_VISIBLE_DEVICES": "0,1", - "SLURM_NTASKS": "2", - "SLURM_JOB_NAME": "SOME_NAME", - "SLURM_NODEID": "0", - "LOCAL_RANK": "0", - "SLURM_PROCID": "0", - "SLURM_LOCALID": "0", - }, -) -@mock.patch("torch.cuda.device_count", return_value=2) -@pytest.mark.parametrize( - ["ddp_backend", "gpus", "num_processes"], - [("ddp_cpu", None, 2), ("ddp", 2, 0), ("ddp_spawn", 2, 0)], -) -@RunIf(rpc=True) -def test_rpc_choice(tmpdir, ddp_backend, gpus, num_processes): - - class CB(Callback): - - def on_fit_start(self, trainer, pl_module): - assert isinstance(trainer.training_type_plugin, RPCPlugin) - raise RuntimeError('finished plugin check') - - model = BoringModel() - trainer = Trainer( - default_root_dir=str(tmpdir), - fast_dev_run=True, - gpus=gpus, - num_processes=num_processes, - distributed_backend=ddp_backend, - callbacks=[CB()], - plugins=[RPCPlugin()] - ) - - with pytest.raises(RuntimeError, match='finished plugin check'): - trainer.fit(model) - - -class CustomRPCPlugin(RPCPlugin): - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.rpc_save_model_count = 0 - self.worker_optimizer_step_count = 0 - - def rpc_save_model(self, *_) -> None: - self.rpc_save_model_count += 1 - - def barrier(self, name: Optional[str] = None) -> None: - return - - -@RunIf(min_gpus=2, special=True, rpc=True) -def test_rpc_function_calls_ddp(tmpdir): - model = BoringModel() - plugin = CustomRPCPlugin() - max_epochs = 2 - limit_train_batches = 2 - trainer = Trainer( - limit_train_batches=limit_train_batches, - limit_val_batches=2, - max_epochs=max_epochs, - gpus=2, - distributed_backend='ddp', - plugins=[plugin], - default_root_dir=tmpdir, - ) - - trainer.fit(model) - if trainer.global_rank == 0: # Main process - assert plugin.rpc_save_model_count == max_epochs - else: # Worker process - assert plugin.rpc_save_model_count == max_epochs diff --git a/tests/plugins/test_rpc_sequential_plugin.py b/tests/plugins/test_rpc_sequential_plugin.py deleted file mode 100644 index 00a6220036c3e..0000000000000 --- a/tests/plugins/test_rpc_sequential_plugin.py +++ /dev/null @@ -1,185 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os -from unittest import mock - -import pytest -import torch -import torch.distributed as torch_distrib -from torch import nn - -from pytorch_lightning import LightningModule, Trainer -from pytorch_lightning.plugins.training_type.rpc_sequential import RPCSequentialPlugin -from pytorch_lightning.utilities.exceptions import MisconfigurationException -from tests.helpers.boring_model import RandomDataset -from tests.helpers.runif import RunIf - - -@mock.patch.dict(os.environ, {"PL_DEV_DEBUG": "1"}) -@RunIf(min_gpus=2, special=True, fairscale_pipe=True) -def test_rpc_sequential_plugin_manual(tmpdir): - model = SequentialModelRPCManual() - trainer = Trainer( - max_epochs=2, - limit_train_batches=2, - limit_val_batches=2, - limit_test_batches=2, - gpus=2, - distributed_backend="ddp", - plugins=[RPCSequentialPlugin(balance=[2, 1], rpc_timeout_sec=5 * 60)], - ) - - trainer.fit(model) - - if torch_distrib.is_initialized() and torch_distrib.get_rank() == 0: - assert len(trainer.dev_debugger.pbar_added_metrics) > 0 - - if trainer.accelerator.rpc_enabled: - # Called at the end of trainer to ensure all processes are killed - trainer.accelerator.training_type_plugin.exit_rpc_process() - - -@RunIf(min_gpus=2, special=True, fairscale_pipe=True) -def test_rpc_sequential_plugin_manual_amp(tmpdir): - model = SequentialModelRPCManual() - trainer = Trainer( - max_epochs=2, - limit_train_batches=2, - limit_val_batches=2, - limit_test_batches=2, - gpus=2, - precision=16, - amp_backend="native", - distributed_backend="ddp", - plugins=[RPCSequentialPlugin(balance=[2, 1])], - ) - with pytest.raises( - MisconfigurationException, - match='`RPCSequentialPlugin` is currently not supported in Automatic Mixed Precision' - ): - trainer.fit(model) - - -@mock.patch.dict(os.environ, {"PL_DEV_DEBUG": "1"}) -@RunIf(min_gpus=2, special=True, fairscale_pipe=True) -def test_rpc_sequential_plugin_automatic(tmpdir): - model = SequentialModelRPCAutomatic() - trainer = Trainer( - max_epochs=2, - limit_train_batches=2, - limit_val_batches=2, - limit_test_batches=2, - gpus=2, - distributed_backend="ddp", - plugins=[RPCSequentialPlugin(balance=[2, 1])], - ) - - trainer.fit(model) - - if torch_distrib.is_initialized() and torch_distrib.get_rank() == 0: - assert len(trainer.dev_debugger.pbar_added_metrics) > 0 - - if trainer.accelerator.rpc_enabled: - # Called at the end of trainer to ensure all processes are killed - trainer.accelerator.training_type_plugin.exit_rpc_process() - - -@RunIf(min_gpus=2, special=True, fairscale_pipe=True) -def test_rpc_sequential_plugin_with_wrong_balance(tmpdir): - model = SequentialModelRPCAutomatic() - trainer = Trainer( - max_epochs=2, - limit_train_batches=2, - limit_val_batches=2, - limit_test_batches=2, - gpus=2, - distributed_backend="ddp", - plugins=[RPCSequentialPlugin(balance=[2, 2])], - ) - - with pytest.raises( - MisconfigurationException, match="The provided balance sum: 4 does not match your Sequential length: 3" - ): - trainer.fit(model) - - if trainer.accelerator.rpc_enabled: - # Called at the end of trainer to ensure all processes are killed - trainer.accelerator.training_type_plugin.exit_rpc_process() - - -class SequentialModelRPCManual(LightningModule): - - def __init__(self): - super().__init__() - self.sequential_module = nn.Sequential(torch.nn.Linear(32, 32), nn.ReLU(), nn.Linear(32, 2)) - self.automatic_optimization = False - - def forward(self, x): - return self.sequential_module(x) - - def loss(self, prediction): - # An arbitrary loss to have a loss that updates the model weights during `Trainer.fit` calls - return torch.nn.functional.mse_loss(prediction, torch.ones_like(prediction)) - - def step(self, x): - x = self(x) - out = torch.nn.functional.mse_loss(x, torch.ones_like(x)) - return out - - def training_step(self, batch, batch_idx): - opt = self.optimizers() - output = self.sequential_module(batch) - loss = self.loss(output) - self.log("train_loss", loss, on_epoch=True, prog_bar=True) - self.manual_backward(loss, opt) - assert torch.stack([torch.abs(p.grad).sum() for p in self.parameters()]).sum() > 0 - opt.step() - opt.zero_grad() - assert torch.stack([torch.abs(p.grad).sum() for p in self.parameters()]).sum() == 0 - - def validation_step(self, batch, batch_idx): - output = self.sequential_module(batch) - loss = self.loss(output) - return loss - - def test_step(self, batch, batch_idx): - output = self.sequential_module(batch) - return self.loss(batch, output) - - def configure_optimizers(self): - optimizer = torch.optim.SGD(self.parameters(), lr=0.1) - lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1) - return [optimizer], [lr_scheduler] - - def train_dataloader(self): - return torch.utils.data.DataLoader(RandomDataset(32, 64)) - - def val_dataloader(self): - return torch.utils.data.DataLoader(RandomDataset(32, 64)) - - def test_dataloader(self): - return torch.utils.data.DataLoader(RandomDataset(32, 64)) - - -class SequentialModelRPCAutomatic(SequentialModelRPCManual): - - def __init__(self): - super().__init__() - self.automatic_optimization = True - - def training_step(self, batch, batch_idx): - output = self.sequential_module(batch) - loss = self.loss(output) - self.log("train_loss", loss, on_epoch=True, prog_bar=True) - return loss