diff --git a/pytorch_lightning/accelerators/accelerator_connector.py b/pytorch_lightning/accelerators/accelerator_connector.py index 3aef29f94851a3..21f72b1949aa44 100644 --- a/pytorch_lightning/accelerators/accelerator_connector.py +++ b/pytorch_lightning/accelerators/accelerator_connector.py @@ -137,14 +137,14 @@ def on_trainer_init( self.trainer.replace_sampler_ddp = replace_sampler_ddp def _select_environment(self): - env = None - if self.trainer.plugin_connector.cloud_environment: - return self.trainer.plugin_connector.cloud_environment - elif self._is_using_torchelastic(): - env = TorchElasticEnvironment() + env = self.trainer.plugin_connector.cloud_environment elif self.trainer.is_slurm_managing_tasks: env = SLURMEnvironment() + elif self._is_using_torchelastic(): + env = TorchElasticEnvironment() + else: + env = TorchElasticEnvironment() return env def _is_using_torchelastic(self): @@ -184,45 +184,55 @@ def select_accelerator(self): if os.environ.get('PL_DDP_PID', False): use_torchelastic_ddp = False + cluster_env = self._select_environment() + # choose the appropriate accelerator backend if self.trainer.use_ddp2: - accelerator_backend = accelerators.DDP2Backend(self.trainer) + accelerator_backend = accelerators.DDP2Backend(self.trainer, cluster_env) elif use_ddp_cpu_slurm: - accelerator_backend = accelerators.DDPCPUSLURMBackend(self.trainer) + accelerator_backend = accelerators.DDPCPUSLURMBackend(self.trainer, cluster_env) elif use_slurm_ddp: - accelerator_backend = accelerators.DDPSLURMBackend(self.trainer) + accelerator_backend = accelerators.DDPSLURMBackend(self.trainer, cluster_env) elif use_ddp_cpu_torch_elastic: - accelerator_backend = accelerators.DDPCPUTorchElasticBackend(self.trainer) + accelerator_backend = accelerators.DDPCPUTorchElasticBackend(self.trainer, cluster_env) elif use_torchelastic_ddp: - accelerator_backend = accelerators.DDPTorchElasticBackend(self.trainer) + accelerator_backend = accelerators.DDPTorchElasticBackend(self.trainer, cluster_env) elif use_ddp_spawn: - accelerator_backend = accelerators.DDPSpawnBackend(self.trainer, nprocs=self.trainer.num_processes) + accelerator_backend = accelerators.DDPSpawnBackend( + self.trainer, + nprocs=self.trainer.num_processes, + cluster_environment=cluster_env + ) elif use_ddp_cpu_spawn: - accelerator_backend = accelerators.DDPCPUSpawnBackend(self.trainer, nprocs=self.trainer.num_processes) + accelerator_backend = accelerators.DDPCPUSpawnBackend( + self.trainer, + nprocs=self.trainer.num_processes, + cluster_environment=cluster_env + ) elif self.trainer.distributed_backend == "ddp": - accelerator_backend = accelerators.DDPBackend(self.trainer) + accelerator_backend = accelerators.DDPBackend(self.trainer, cluster_env) elif self.trainer.use_dp: - accelerator_backend = accelerators.DataParallelBackend(self.trainer) + accelerator_backend = accelerators.DataParallelBackend(self.trainer, cluster_env) elif self.trainer.use_horovod: - accelerator_backend = accelerators.HorovodBackend(self.trainer) + accelerator_backend = accelerators.HorovodBackend(self.trainer, cluster_env) elif self.trainer.use_single_gpu: - accelerator_backend = accelerators.GPUBackend(self.trainer) + accelerator_backend = accelerators.GPUBackend(self.trainer, cluster_env) elif self.trainer.use_tpu: - accelerator_backend = accelerators.TPUBackend(self.trainer) + accelerator_backend = accelerators.TPUBackend(self.trainer, cluster_env) elif self.trainer.distributed_backend is None: - accelerator_backend = accelerators.CPUBackend(self.trainer) + accelerator_backend = accelerators.CPUBackend(self.trainer, cluster_env) else: raise MisconfigurationException( f'Trainer(distributed_backend={self.trainer.distributed_backend} is not a supported backend' diff --git a/pytorch_lightning/accelerators/base_accelerator.py b/pytorch_lightning/accelerators/base_accelerator.py index 5a5c134862e785..59a441040d5798 100644 --- a/pytorch_lightning/accelerators/base_accelerator.py +++ b/pytorch_lightning/accelerators/base_accelerator.py @@ -198,45 +198,9 @@ def setup_optimizers(self, model): def init_ddp_connection( self, global_rank: int, world_size: int, is_slurm_managing_tasks: bool = True ) -> None: - if is_slurm_managing_tasks: - self.trainer.slurm_connector.connect_ddp(global_rank, world_size) - else: - self.connect_torchelastic(global_rank, world_size) - - def connect_torchelastic( - self, global_rank: int, world_size: int - ) -> None: - """ - Override to define your custom way of setting up a distributed environment. - - Lightning's implementation uses env:// init by default and sets the first node as root - for SLURM managed cluster. - - Args: - global_rank: The global process idx. - world_size: Number of GPUs being use across all nodes. (num_nodes * num_gpus). - """ - - if "MASTER_ADDR" not in os.environ: - rank_zero_warn( - "MASTER_ADDR environment variable is not defined. Set as localhost" - ) - os.environ["MASTER_ADDR"] = "127.0.0.1" - log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}") - - if "MASTER_PORT" not in os.environ: - rank_zero_warn( - "MASTER_PORT environment variable is not defined. Set as 12910" - ) - os.environ["MASTER_PORT"] = "12910" - log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}") - - if "WORLD_SIZE" in os.environ and int(os.environ["WORLD_SIZE"]) != world_size: - rank_zero_warn( - f"WORLD_SIZE environment variable ({os.environ['WORLD_SIZE']}) " - f"is not equal to the computed world size ({world_size}). Ignored." - ) - + os.environ["MASTER_ADDR"] = str(self.cluster_environment.master_address()) + os.environ["MASTER_PORT"] = str(self.cluster_environment.master_port()) + os.environ["WORLD_SIZE"] = str(self.cluster_environment.world_size()) torch_backend = "nccl" if self.trainer.on_gpu else "gloo" if not torch.distributed.is_initialized(): diff --git a/pytorch_lightning/trainer/connectors/slurm_connector.py b/pytorch_lightning/trainer/connectors/slurm_connector.py index a6a1cdbfb59808..4a420efa63060d 100644 --- a/pytorch_lightning/trainer/connectors/slurm_connector.py +++ b/pytorch_lightning/trainer/connectors/slurm_connector.py @@ -22,7 +22,7 @@ def configure_slurm_ddp(self, num_gpu_nodes): # extract SLURM flag vars # whenever we have the correct number of tasks, we let slurm manage processes # otherwise we launch the required number of processes - if self.trainer.use_ddp: + if self.trainer.use_ddp or self.trainer.use_ddp2: self.trainer.num_requested_gpus = self.trainer.num_gpus * num_gpu_nodes self.trainer.num_slurm_tasks = 0 try: diff --git a/setup.cfg b/setup.cfg index ad6d2ef9bae02b..93acd1e0c0b8e4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,6 +43,7 @@ omit = pytorch_lightning/accelerators/ddp2_*.py pytorch_lightning/accelerators/dp_*.py pytorch_lightning/accelerators/tpu_*.py + pytorch_lightning/cluster_environments/*.py [flake8] # TODO: this should be 88 or 100 according PEP8 diff --git a/tests/backends/test_accelerator_connector.py b/tests/backends/test_accelerator_connector.py index 139b612f38e24f..2ae83ac14698a2 100644 --- a/tests/backends/test_accelerator_connector.py +++ b/tests/backends/test_accelerator_connector.py @@ -18,6 +18,7 @@ from pytorch_lightning.callbacks import Callback from pytorch_lightning import accelerators, Trainer from pytorch_lightning.accelerators import Accelerator +from pytorch_lightning.cluster_environments import SLURMEnvironment, TorchElasticEnvironment, ClusterEnvironment from unittest import mock @@ -25,6 +26,7 @@ def test_accelerator_choice_cpu(tmpdir): class CB(Callback): def on_fit_start(self, trainer, pl_module): assert isinstance(trainer.accelerator_backend, accelerators.CPUBackend) + assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment) model = BoringModel() trainer = Trainer( @@ -37,7 +39,9 @@ def on_fit_start(self, trainer, pl_module): def test_accelerator_choice_ddp_cpu(tmpdir): class CB(Callback): def on_fit_start(self, trainer, pl_module): + assert trainer.use_ddp assert isinstance(trainer.accelerator_backend, accelerators.DDPCPUSpawnBackend) + assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment) raise SystemExit() model = BoringModel() @@ -56,7 +60,9 @@ def on_fit_start(self, trainer, pl_module): def test_accelerator_choice_ddp(tmpdir): class CB(Callback): def on_fit_start(self, trainer, pl_module): + assert trainer.use_ddp assert isinstance(trainer.accelerator_backend, accelerators.DDPBackend) + assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment) raise SystemExit() model = BoringModel() @@ -76,7 +82,9 @@ def on_fit_start(self, trainer, pl_module): def test_accelerator_choice_ddp_spawn(tmpdir): class CB(Callback): def on_fit_start(self, trainer, pl_module): + assert trainer.use_ddp assert isinstance(trainer.accelerator_backend, accelerators.DDPSpawnBackend) + assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment) raise SystemExit() model = BoringModel() @@ -102,7 +110,9 @@ def on_fit_start(self, trainer, pl_module): def test_accelerator_choice_ddp_slurm(tmpdir): class CB(Callback): def on_fit_start(self, trainer, pl_module): + assert trainer.use_ddp assert isinstance(trainer.accelerator_backend, accelerators.DDPSLURMBackend) + assert isinstance(trainer.accelerator_backend.cluster_environment, SLURMEnvironment) raise SystemExit() model = BoringModel() @@ -129,7 +139,9 @@ def on_fit_start(self, trainer, pl_module): def test_accelerator_choice_ddp2_slurm(tmpdir): class CB(Callback): def on_fit_start(self, trainer, pl_module): + assert trainer.use_ddp2 assert isinstance(trainer.accelerator_backend, accelerators.DDP2Backend) + assert isinstance(trainer.accelerator_backend.cluster_environment, SLURMEnvironment) raise SystemExit() model = BoringModel() @@ -154,7 +166,9 @@ def on_fit_start(self, trainer, pl_module): def test_accelerator_choice_ddp_te(tmpdir): class CB(Callback): def on_fit_start(self, trainer, pl_module): + assert trainer.use_ddp assert isinstance(trainer.accelerator_backend, accelerators.DDPTorchElasticBackend) + assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment) raise SystemExit() model = BoringModel() @@ -169,6 +183,33 @@ def on_fit_start(self, trainer, pl_module): trainer.fit(model) +@mock.patch.dict(os.environ, { + "CUDA_VISIBLE_DEVICES": "0,1", + "WORLD_SIZE": "2", + "LOCAL_RANK": "0", + "NODE_RANK": "0" +}) +@mock.patch('torch.cuda.device_count', return_value=2) +def test_accelerator_choice_ddp2_te(tmpdir): + class CB(Callback): + def on_fit_start(self, trainer, pl_module): + assert trainer.use_ddp2 + assert isinstance(trainer.accelerator_backend, accelerators.DDP2Backend) + assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment) + raise SystemExit() + + model = BoringModel() + trainer = Trainer( + fast_dev_run=True, + distributed_backend='ddp2', + gpus=2, + callbacks=[CB()] + ) + + with pytest.raises(SystemExit): + trainer.fit(model) + + @mock.patch.dict(os.environ, { "WORLD_SIZE": "1", "LOCAL_RANK": "0", @@ -178,7 +219,9 @@ def on_fit_start(self, trainer, pl_module): def test_accelerator_choice_ddp_cpu_te(tmpdir): class CB(Callback): def on_fit_start(self, trainer, pl_module): + assert trainer.use_ddp assert isinstance(trainer.accelerator_backend, accelerators.DDPCPUTorchElasticBackend) + assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment) raise SystemExit() model = BoringModel() @@ -204,11 +247,49 @@ def on_fit_start(self, trainer, pl_module): def test_accelerator_choice_ddp_cpu_slurm(tmpdir): class CB(Callback): def on_fit_start(self, trainer, pl_module): + assert trainer.use_ddp + assert isinstance(trainer.accelerator_backend, accelerators.DDPCPUSLURMBackend) + assert isinstance(trainer.accelerator_backend.cluster_environment, SLURMEnvironment) + raise SystemExit() + + model = BoringModel() + trainer = Trainer( + fast_dev_run=True, + distributed_backend='ddp_cpu', + num_processes=1, + callbacks=[CB()] + ) + + with pytest.raises(SystemExit): + trainer.fit(model) + + +@mock.patch.dict(os.environ, { + "SLURM_NTASKS": "1", + "SLURM_JOB_NAME": "SOME_NAME", + "SLURM_NODEID": "0", + "LOCAL_RANK": "0", + "SLURM_LOCALID": "0" +}) +@mock.patch('torch.cuda.device_count', return_value=0) +def test_accelerator_choice_ddp_cpu_custom_cluster(tmpdir): + """ + Test that we choose the custom cluster even when SLURM or TE flags are around + """ + class CustomCluster(ClusterEnvironment): + def master_address(self): + return 'asdf' + + class CB(Callback): + def on_fit_start(self, trainer, pl_module): + assert trainer.use_ddp assert isinstance(trainer.accelerator_backend, accelerators.DDPCPUSLURMBackend) + assert isinstance(trainer.accelerator_backend.cluster_environment, CustomCluster) raise SystemExit() model = BoringModel() trainer = Trainer( + plugins=[CustomCluster()], fast_dev_run=True, distributed_backend='ddp_cpu', num_processes=1,