Skip to content

Commit

Permalink
ref: enable custom clusters (1/n) (#4048)
Browse files Browse the repository at this point in the history
* enable cluster plugins

* enable cluster plugins + test backend choices

* enable cluster plugins + test backend choices

* enable cluster plugins + test backend choices

* enable cluster plugins + test backend choices

* enable cluster plugins + test backend choices

* enable cluster plugins + test backend choices
  • Loading branch information
williamFalcon committed Oct 10, 2020
1 parent efec8c7 commit 2b255a3
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 58 deletions.
46 changes: 28 additions & 18 deletions pytorch_lightning/accelerators/accelerator_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ def on_trainer_init(
self.trainer.replace_sampler_ddp = replace_sampler_ddp

def _select_environment(self):
env = None

if self.trainer.plugin_connector.cloud_environment:
return self.trainer.plugin_connector.cloud_environment
elif self._is_using_torchelastic():
env = TorchElasticEnvironment()
env = self.trainer.plugin_connector.cloud_environment
elif self.trainer.is_slurm_managing_tasks:
env = SLURMEnvironment()
elif self._is_using_torchelastic():
env = TorchElasticEnvironment()
else:
env = TorchElasticEnvironment()
return env

def _is_using_torchelastic(self):
Expand Down Expand Up @@ -163,45 +163,55 @@ def select_accelerator(self):
if os.environ.get('PL_DDP_PID', False):
use_torchelastic_ddp = False

cluster_env = self._select_environment()

# choose the appropriate accelerator backend
if self.trainer.use_ddp2:
accelerator_backend = accelerators.DDP2Backend(self.trainer)
accelerator_backend = accelerators.DDP2Backend(self.trainer, cluster_env)

elif use_ddp_cpu_slurm:
accelerator_backend = accelerators.DDPCPUSLURMBackend(self.trainer)
accelerator_backend = accelerators.DDPCPUSLURMBackend(self.trainer, cluster_env)

elif use_slurm_ddp:
accelerator_backend = accelerators.DDPSLURMBackend(self.trainer)
accelerator_backend = accelerators.DDPSLURMBackend(self.trainer, cluster_env)

elif use_ddp_cpu_torch_elastic:
accelerator_backend = accelerators.DDPCPUTorchElasticBackend(self.trainer)
accelerator_backend = accelerators.DDPCPUTorchElasticBackend(self.trainer, cluster_env)

elif use_torchelastic_ddp:
accelerator_backend = accelerators.DDPTorchElasticBackend(self.trainer)
accelerator_backend = accelerators.DDPTorchElasticBackend(self.trainer, cluster_env)

elif use_ddp_spawn:
accelerator_backend = accelerators.DDPSpawnBackend(self.trainer, nprocs=self.trainer.num_processes)
accelerator_backend = accelerators.DDPSpawnBackend(
self.trainer,
nprocs=self.trainer.num_processes,
cluster_environment=cluster_env
)

elif use_ddp_cpu_spawn:
accelerator_backend = accelerators.DDPCPUSpawnBackend(self.trainer, nprocs=self.trainer.num_processes)
accelerator_backend = accelerators.DDPCPUSpawnBackend(
self.trainer,
nprocs=self.trainer.num_processes,
cluster_environment=cluster_env
)

elif self.trainer.distributed_backend == "ddp":
accelerator_backend = accelerators.DDPBackend(self.trainer)
accelerator_backend = accelerators.DDPBackend(self.trainer, cluster_env)

elif self.trainer.use_dp:
accelerator_backend = accelerators.DataParallelBackend(self.trainer)
accelerator_backend = accelerators.DataParallelBackend(self.trainer, cluster_env)

elif self.trainer.use_horovod:
accelerator_backend = accelerators.HorovodBackend(self.trainer)
accelerator_backend = accelerators.HorovodBackend(self.trainer, cluster_env)

elif self.trainer.use_single_gpu:
accelerator_backend = accelerators.GPUBackend(self.trainer)
accelerator_backend = accelerators.GPUBackend(self.trainer, cluster_env)

elif self.trainer.use_tpu:
accelerator_backend = accelerators.TPUBackend(self.trainer)
accelerator_backend = accelerators.TPUBackend(self.trainer, cluster_env)

elif self.trainer.distributed_backend is None:
accelerator_backend = accelerators.CPUBackend(self.trainer)
accelerator_backend = accelerators.CPUBackend(self.trainer, cluster_env)
else:
raise MisconfigurationException(
f'Trainer(distributed_backend={self.trainer.distributed_backend} is not a supported backend'
Expand Down
42 changes: 3 additions & 39 deletions pytorch_lightning/accelerators/base_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,45 +195,9 @@ def setup_optimizers(self, model):
def init_ddp_connection(
self, global_rank: int, world_size: int, is_slurm_managing_tasks: bool = True
) -> None:
if is_slurm_managing_tasks:
self.trainer.slurm_connector.connect_ddp(global_rank, world_size)
else:
self.connect_torchelastic(global_rank, world_size)

def connect_torchelastic(
self, global_rank: int, world_size: int
) -> None:
"""
Override to define your custom way of setting up a distributed environment.
Lightning's implementation uses env:// init by default and sets the first node as root
for SLURM managed cluster.
Args:
global_rank: The global process idx.
world_size: Number of GPUs being use across all nodes. (num_nodes * num_gpus).
"""

if "MASTER_ADDR" not in os.environ:
rank_zero_warn(
"MASTER_ADDR environment variable is not defined. Set as localhost"
)
os.environ["MASTER_ADDR"] = "127.0.0.1"
log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}")

if "MASTER_PORT" not in os.environ:
rank_zero_warn(
"MASTER_PORT environment variable is not defined. Set as 12910"
)
os.environ["MASTER_PORT"] = "12910"
log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}")

if "WORLD_SIZE" in os.environ and int(os.environ["WORLD_SIZE"]) != world_size:
rank_zero_warn(
f"WORLD_SIZE environment variable ({os.environ['WORLD_SIZE']}) "
f"is not equal to the computed world size ({world_size}). Ignored."
)

os.environ["MASTER_ADDR"] = str(self.cluster_environment.master_address())
os.environ["MASTER_PORT"] = str(self.cluster_environment.master_port())
os.environ["WORLD_SIZE"] = str(self.cluster_environment.world_size())
torch_backend = "nccl" if self.trainer.on_gpu else "gloo"

if not torch.distributed.is_initialized():
Expand Down
2 changes: 1 addition & 1 deletion pytorch_lightning/trainer/connectors/slurm_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def configure_slurm_ddp(self, num_gpu_nodes):
# extract SLURM flag vars
# whenever we have the correct number of tasks, we let slurm manage processes
# otherwise we launch the required number of processes
if self.trainer.use_ddp:
if self.trainer.use_ddp or self.trainer.use_ddp2:
self.trainer.num_requested_gpus = self.trainer.num_gpus * num_gpu_nodes
self.trainer.num_slurm_tasks = 0
try:
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ omit =
pytorch_lightning/accelerators/ddp2_*.py
pytorch_lightning/accelerators/dp_*.py
pytorch_lightning/accelerators/tpu_*.py
pytorch_lightning/cluster_environments/*.py

[flake8]
# TODO: this should be 88 or 100 according PEP8
Expand Down
81 changes: 81 additions & 0 deletions tests/backends/test_accelerator_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
from tests.base.boring_model import BoringModel
from pytorch_lightning.callbacks import Callback
from pytorch_lightning import accelerators, Trainer
from pytorch_lightning.cluster_environments import SLURMEnvironment, TorchElasticEnvironment, ClusterEnvironment
from unittest import mock


def test_accelerator_choice_cpu(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert isinstance(trainer.accelerator_backend, accelerators.CPUBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment)

model = BoringModel()
trainer = Trainer(
Expand All @@ -36,7 +38,9 @@ def on_fit_start(self, trainer, pl_module):
def test_accelerator_choice_ddp_cpu(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, accelerators.DDPCPUSpawnBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment)
raise SystemExit()

model = BoringModel()
Expand All @@ -55,7 +59,9 @@ def on_fit_start(self, trainer, pl_module):
def test_accelerator_choice_ddp(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, accelerators.DDPBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment)
raise SystemExit()

model = BoringModel()
Expand All @@ -75,7 +81,9 @@ def on_fit_start(self, trainer, pl_module):
def test_accelerator_choice_ddp_spawn(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, accelerators.DDPSpawnBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment)
raise SystemExit()

model = BoringModel()
Expand All @@ -101,7 +109,9 @@ def on_fit_start(self, trainer, pl_module):
def test_accelerator_choice_ddp_slurm(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, accelerators.DDPSLURMBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, SLURMEnvironment)
raise SystemExit()

model = BoringModel()
Expand All @@ -128,7 +138,9 @@ def on_fit_start(self, trainer, pl_module):
def test_accelerator_choice_ddp2_slurm(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp2
assert isinstance(trainer.accelerator_backend, accelerators.DDP2Backend)
assert isinstance(trainer.accelerator_backend.cluster_environment, SLURMEnvironment)
raise SystemExit()

model = BoringModel()
Expand All @@ -153,7 +165,9 @@ def on_fit_start(self, trainer, pl_module):
def test_accelerator_choice_ddp_te(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, accelerators.DDPTorchElasticBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment)
raise SystemExit()

model = BoringModel()
Expand All @@ -168,6 +182,33 @@ def on_fit_start(self, trainer, pl_module):
trainer.fit(model)


@mock.patch.dict(os.environ, {
"CUDA_VISIBLE_DEVICES": "0,1",
"WORLD_SIZE": "2",
"LOCAL_RANK": "0",
"NODE_RANK": "0"
})
@mock.patch('torch.cuda.device_count', return_value=2)
def test_accelerator_choice_ddp2_te(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp2
assert isinstance(trainer.accelerator_backend, accelerators.DDP2Backend)
assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment)
raise SystemExit()

model = BoringModel()
trainer = Trainer(
fast_dev_run=True,
distributed_backend='ddp2',
gpus=2,
callbacks=[CB()]
)

with pytest.raises(SystemExit):
trainer.fit(model)


@mock.patch.dict(os.environ, {
"WORLD_SIZE": "1",
"LOCAL_RANK": "0",
Expand All @@ -177,7 +218,9 @@ def on_fit_start(self, trainer, pl_module):
def test_accelerator_choice_ddp_cpu_te(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, accelerators.DDPCPUTorchElasticBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, TorchElasticEnvironment)
raise SystemExit()

model = BoringModel()
Expand All @@ -203,11 +246,49 @@ def on_fit_start(self, trainer, pl_module):
def test_accelerator_choice_ddp_cpu_slurm(tmpdir):
class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, accelerators.DDPCPUSLURMBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, SLURMEnvironment)
raise SystemExit()

model = BoringModel()
trainer = Trainer(
fast_dev_run=True,
distributed_backend='ddp_cpu',
num_processes=1,
callbacks=[CB()]
)

with pytest.raises(SystemExit):
trainer.fit(model)


@mock.patch.dict(os.environ, {
"SLURM_NTASKS": "1",
"SLURM_JOB_NAME": "SOME_NAME",
"SLURM_NODEID": "0",
"LOCAL_RANK": "0",
"SLURM_LOCALID": "0"
})
@mock.patch('torch.cuda.device_count', return_value=0)
def test_accelerator_choice_ddp_cpu_custom_cluster(tmpdir):
"""
Test that we choose the custom cluster even when SLURM or TE flags are around
"""
class CustomCluster(ClusterEnvironment):
def master_address(self):
return 'asdf'

class CB(Callback):
def on_fit_start(self, trainer, pl_module):
assert trainer.use_ddp
assert isinstance(trainer.accelerator_backend, accelerators.DDPCPUSLURMBackend)
assert isinstance(trainer.accelerator_backend.cluster_environment, CustomCluster)
raise SystemExit()

model = BoringModel()
trainer = Trainer(
plugins=[CustomCluster()],
fast_dev_run=True,
distributed_backend='ddp_cpu',
num_processes=1,
Expand Down

0 comments on commit 2b255a3

Please sign in to comment.