Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LSF support #5102

Merged
merged 41 commits into from
Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
9abe28e
add ClusterEnvironment for LSF systems
ajtritt Dec 3, 2020
f2e44c1
update init file
ajtritt Dec 4, 2020
615f08d
add available cluster environments
ajtritt Dec 4, 2020
86f2fa1
clean up LSFEnvironment
ajtritt Dec 9, 2020
b72b42d
add ddp_hpc as a distributed backend
ajtritt Dec 9, 2020
6a9a4ca
clean up SLURMEnvironment
ajtritt Dec 9, 2020
5bbba77
Merge branch 'master' into lsf_env
ajtritt Dec 9, 2020
94e4d4b
remove extra blank line
ajtritt Dec 9, 2020
113e787
init device for DDPHPCAccelerator
ajtritt Dec 10, 2020
d12d652
committing current state
ajtritt Dec 11, 2020
d0ac793
Merge branch 'master' into lsf_env
ajtritt Dec 11, 2020
b53d153
add additional methods to ClusterEnvironments
ajtritt Dec 11, 2020
0b6edfe
add NVIDIA mixin for setting up CUDA envars
ajtritt Dec 11, 2020
f7d87f6
remove troubleshooting prints
ajtritt Dec 12, 2020
3c9edf9
cleanup SLURMEnvironment
ajtritt Dec 12, 2020
77f3b71
fix docstring
ajtritt Dec 12, 2020
eb7d07c
cleanup TorchElasticEnvironment and add documentation
ajtritt Dec 12, 2020
09064e1
PEP8 puts a cork in it
ajtritt Dec 12, 2020
fb30942
Merge branch 'master' into lsf_env
ajtritt Dec 12, 2020
7be8f1d
add set_ranks_to_trainer
ajtritt Feb 11, 2021
5c04b8e
Merge remote-tracking branch 'pl/master' into lsf_env
ajtritt Feb 12, 2021
004daef
Merge remote-tracking branch 'pl/master' into lsf_env
ajtritt Feb 12, 2021
a113210
remove unused import
ajtritt Feb 12, 2021
d17281c
move to new location
ajtritt Feb 12, 2021
b4028a7
Merge branch 'master' into lsf_env
awaelchli Jul 9, 2021
7a23376
update LSF environment
awaelchli Jul 9, 2021
02410ff
remove mixin
awaelchli Jul 9, 2021
7f91740
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 9, 2021
1b3bc7a
changelog
awaelchli Jul 9, 2021
5ec0e9f
Merge remote-tracking branch 'ajtritt/lsf_env' into lsf_env
awaelchli Jul 9, 2021
92215ab
reset slurm env
awaelchli Jul 9, 2021
a613759
add tests
awaelchli Jul 9, 2021
f7c5e0e
add licence
awaelchli Jul 9, 2021
00de88e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 9, 2021
cfd59b8
test node_rank
awaelchli Jul 9, 2021
5ec99e9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 9, 2021
cfe544f
add lsf env to docs
awaelchli Jul 9, 2021
71569de
add auto detection for lsf environment
awaelchli Jul 9, 2021
7c26b41
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 9, 2021
077964d
fix is_using_lsf() and test
awaelchli Jul 9, 2021
7f127c8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/source/accelerators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,27 @@ TPU Accelerator

.. autoclass:: pytorch_lightning.accelerators.tpu_accelerator.TPUAccelerator
:noindex:

------------

*****************************
Available ClusterEnvironments
*****************************

SLURM Environment
=================

.. autoclass:: pytorch_lightning.cluster_environments.slurm_environment.SLURMEnvironment
:noindex

LSF Environment
===============

.. autoclass:: pytorch_lightning.cluster_environments.lsf_environment.LSFEnvironment
:noindex

TorchElastic Environment
========================

.. autoclass:: pytorch_lightning.cluster_environments.torchelastic_environment.TorchElasticEnvironment
:noindex
5 changes: 2 additions & 3 deletions pytorch_lightning/accelerators/accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,13 @@ def setup_optimizers(self, model):
self.trainer.optimizer_frequencies = optimizer_frequencies

def init_ddp_connection(
self, global_rank: int, world_size: int, is_slurm_managing_tasks: bool = True
self, global_rank: int, world_size: int
) -> None:
self.ddp_plugin.init_ddp_connection(
self.trainer,
self.cluster_environment,
global_rank,
world_size,
is_slurm_managing_tasks,
world_size
)

def sync_tensor(self,
Expand Down
135 changes: 25 additions & 110 deletions pytorch_lightning/accelerators/accelerator_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys

import torch

Expand Down Expand Up @@ -121,16 +122,6 @@ def on_trainer_init(
self.trainer.world_size = 1
self.trainer.interactive_ddp_procs = []

# link up SLURM
# TODO: this should be taken out of here... but depends too much on DDP
self.trainer.slurm_connector.on_trainer_init(self.trainer.num_nodes)
self.trainer.node_rank = self.determine_ddp_node_rank()
self.trainer.local_rank = self.determine_local_rank()
self.trainer.global_rank = 0

# NVIDIA setup
self.set_nvidia_flags(self.trainer.is_slurm_managing_tasks, self.trainer.data_parallel_device_ids)

self.trainer.on_colab_kaggle = os.getenv('COLAB_GPU') or os.getenv('KAGGLE_URL_BASE')

self.trainer.replace_sampler_ddp = replace_sampler_ddp
Expand All @@ -151,12 +142,8 @@ def _map_deprecated_dist_backend(self, accelerator, distributed_backend):
return distributed_backend

def _select_environment(self):
if self.trainer.plugin_connector.cloud_environment:
env = self.trainer.plugin_connector.cloud_environment
elif self.trainer.is_slurm_managing_tasks:
env = SLURMEnvironment()
elif self._is_using_torchelastic():
env = TorchElasticEnvironment()
if self.trainer.plugin_connector.cluster_environment:
env = self.trainer.plugin_connector.cluster_environment
else:
env = TorchElasticEnvironment()
return env
Expand All @@ -182,84 +169,14 @@ def select_accelerator(self):
# ----------------------------------
# choose an accelerator for the user
# ----------------------------------
use_slurm_ddp = self.trainer.use_ddp and self.trainer.is_slurm_managing_tasks

# torchelastic or general non_slurm ddp
te_flags_passed = 'WORLD_SIZE' in os.environ and ('GROUP_RANK' in os.environ or 'NODE_RANK' in os.environ)
use_torchelastic_ddp = self.trainer.use_ddp and te_flags_passed

use_ddp_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_spawn"
use_ddp_cpu_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_cpu"

use_ddp_cpu_torch_elastic = use_ddp_cpu_spawn and self._is_using_torchelastic()
use_ddp_cpu_slurm = use_ddp_cpu_spawn and self.trainer.is_slurm_managing_tasks

# ddp script mode uses the same flags as TE
# TODO: decouple from TE
if os.environ.get('PL_IN_DDP_SUBPROCESS', False):
use_torchelastic_ddp = False

cluster_env = self._select_environment()

# choose the appropriate accelerator backend
if self.trainer.use_ddp2:
accelerator_backend = accelerators.DDP2Accelerator(
self.trainer,
cluster_env,
self.trainer.plugin_connector.ddp_plugin
)

elif use_ddp_cpu_slurm:
awaelchli marked this conversation as resolved.
Show resolved Hide resolved
accelerator_backend = accelerators.DDPCPUHPCAccelerator(
self.trainer,
cluster_env,
self.trainer.plugin_connector.ddp_plugin
)

elif use_slurm_ddp:
accelerator_backend = accelerators.DDPHPCAccelerator(
self.trainer,
cluster_env,
self.trainer.plugin_connector.ddp_plugin
)

elif use_ddp_cpu_torch_elastic:
accelerator_backend = accelerators.DDPCPUHPCAccelerator(
self.trainer,
cluster_env,
self.trainer.plugin_connector.ddp_plugin
)

elif use_torchelastic_ddp:
accelerator_backend = accelerators.DDPHPCAccelerator(
self.trainer,
cluster_env,
self.trainer.plugin_connector.ddp_plugin
)

elif use_ddp_spawn:
accelerator_backend = accelerators.DDPSpawnAccelerator(
self.trainer,
nprocs=self.trainer.num_processes,
cluster_environment=cluster_env,
ddp_plugin=self.trainer.plugin_connector.ddp_plugin
)

elif use_ddp_cpu_spawn:
accelerator_backend = accelerators.DDPCPUSpawnAccelerator(
self.trainer,
nprocs=self.trainer.num_processes,
cluster_environment=cluster_env,
ddp_plugin=self.trainer.plugin_connector.ddp_plugin
)

elif self.trainer.distributed_backend == "ddp":
accelerator_backend = accelerators.DDPAccelerator(
self.trainer,
cluster_env,
ddp_plugin=self.trainer.plugin_connector.ddp_plugin
)

elif self.trainer.use_dp:
accelerator_backend = accelerators.DataParallelAccelerator(self.trainer, cluster_env)

Expand All @@ -274,6 +191,27 @@ def select_accelerator(self):

elif self.trainer.distributed_backend is None:
accelerator_backend = accelerators.CPUAccelerator(self.trainer, cluster_env)

elif self.trainer.use_ddp:
spawn = self.trainer.distributed_backend == "ddp_spawn"
use_cpu = self.trainer.gpus is None

acc_args = [self.trainer]
acc_kwargs = {'cluster_environment': cluster_env, 'ddp_plugin': self.trainer.plugin_connector.ddp_plugin}
ddp_cls = None
if use_cpu:
if spawn:
ddp_cls = accelerators.DDPCPUSpawnAccelerator
acc_kwargs['nprocs'] = self.trainer.num_processes
else:
ddp_cls = accelerators.DDPCPUHPCAccelerator
else:
if spawn:
ddp_cls = accelerators.DDPSpawnAccelerator
acc_kwargs['nprocs'] = self.trainer.num_processes
else:
ddp_cls = accelerators.DDPHPCAccelerator
accelerator_backend = ddp_cls(*acc_args, **acc_kwargs)
else:
raise MisconfigurationException(
f'Trainer(accelerator={self.trainer.distributed_backend} is not a supported backend'
Expand Down Expand Up @@ -382,34 +320,11 @@ def has_horovodrun():
"""Returns True if running with `horovodrun` using Gloo or OpenMPI."""
return 'OMPI_COMM_WORLD_RANK' in os.environ or 'HOROVOD_RANK' in os.environ

def set_nvidia_flags(self, is_slurm_managing_tasks, data_parallel_device_ids):
def set_nvidia_flags(self, data_parallel_device_ids):
if data_parallel_device_ids is None:
return

# set the correct cuda visible devices (using pci order)
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
all_gpu_ids = ",".join([str(x) for x in range(torch.cuda.device_count())])
devices = os.environ.get("CUDA_VISIBLE_DEVICES", all_gpu_ids)
log.info(f'LOCAL_RANK: {self.trainer.local_rank} - CUDA_VISIBLE_DEVICES: [{devices}]')

def determine_local_rank(self):
if self.trainer.is_slurm_managing_tasks:
return int(os.environ['SLURM_LOCALID'])
return int(os.environ.get('LOCAL_RANK', 0))

def determine_ddp_node_rank(self):
if self.trainer.is_slurm_managing_tasks:
return int(os.environ['SLURM_NODEID'])

# torchelastic uses the envvar GROUP_RANK, whereas other systems(?) use NODE_RANK.
# otherwise use given node rank or default to node rank 0
env_vars = ['NODE_RANK', 'GROUP_RANK']
node_ids = [(k, os.environ.get(k, None)) for k in env_vars]
node_ids = [(k, v) for k, v in node_ids if v is not None]
if len(node_ids) == 0:
return 0
if len(node_ids) > 1:
log.warning(f"Multiple environment variables ({node_ids}) defined for node rank. Using the first one.")
k, rank = node_ids.pop()
rank_zero_info(f"Using environment variable {k} for node rank ({rank}).")
return int(rank)
3 changes: 1 addition & 2 deletions pytorch_lightning/accelerators/ddp2_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ def ddp_train(self, process_idx, mp_queue, model):
model.trainer = self.trainer
self.init_ddp_connection(
self.trainer.global_rank,
self.trainer.world_size,
self.trainer.is_slurm_managing_tasks
self.trainer.world_size
)

if isinstance(self.ddp_plugin, RPCPlugin):
Expand Down
3 changes: 1 addition & 2 deletions pytorch_lightning/accelerators/ddp_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,7 @@ def ddp_train(self, process_idx, model):
model.trainer = self.trainer
self.init_ddp_connection(
self.trainer.global_rank,
self.trainer.world_size,
self.trainer.is_slurm_managing_tasks
self.trainer.world_size
)

if isinstance(self.ddp_plugin, RPCPlugin):
Expand Down
2 changes: 1 addition & 1 deletion pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self,
super().__init__(trainer, cluster_environment, ddp_plugin)
self.nickname = 'ddp_cpu'

def model_to_device(self, model, process_idx):
def model_to_device(self, model):
model.cpu()

def get_device_ids(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def set_world_ranks(self, process_idx):
self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx
self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes

def model_to_device(self, model, process_idx):
def model_to_device(self, model):
model.cpu()

def get_device_ids(self):
Expand Down
15 changes: 11 additions & 4 deletions pytorch_lightning/accelerators/ddp_hpc_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from pytorch_lightning import _logger as log
from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp
from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin
from pytorch_lightning.cluster_environments import ClusterEnvironment
from pytorch_lightning.core.lightning import LightningModule
from pytorch_lightning.distributed.dist import LightningDistributed
Expand All @@ -34,7 +35,7 @@
from hydra.utils import get_original_cwd, to_absolute_path


class DDPHPCAccelerator(Accelerator):
class DDPHPCAccelerator(Accelerator, NVIDIAMixin):

def __init__(self,
trainer,
Expand All @@ -57,6 +58,10 @@ def __init__(self,

def setup(self, model):
self.trainer.model = model
# ----------------------------
# NVIDIA FLAGS
# ----------------------------
self.set_nvidia_flags(self.trainer.data_parallel_device_ids)
self.task_idx = self.cluster_environment.local_rank()

def train(self):
Expand All @@ -65,7 +70,7 @@ def train(self):

def set_world_ranks(self, process_idx):
self.trainer.local_rank = process_idx
self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx
self.trainer.global_rank = self.cluster_environment.node_rank() * self.trainer.num_processes + process_idx
self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes

def init_device(self, process_idx):
Expand Down Expand Up @@ -134,14 +139,16 @@ def ddp_train(self, process_idx, model):
# set warning rank
rank_zero_only.rank = self.trainer.global_rank

# Initialize cuda device
self.init_device(process_idx)

# set up server using proc 0's ip address
# try to init for 20 times at max in case ports are taken
# where to store ip_table
model.trainer = self.trainer
self.init_ddp_connection(
self.trainer.global_rank,
self.trainer.world_size,
self.trainer.is_slurm_managing_tasks
self.trainer.world_size
)

if isinstance(self.ddp_plugin, RPCPlugin):
Expand Down
7 changes: 6 additions & 1 deletion pytorch_lightning/accelerators/ddp_spawn_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from pytorch_lightning import _logger as log
from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp
from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin
from pytorch_lightning.cluster_environments import ClusterEnvironment
from pytorch_lightning.core.lightning import LightningModule
from pytorch_lightning.distributed import LightningDistributed
Expand All @@ -44,7 +45,7 @@
from hydra.utils import get_original_cwd, to_absolute_path


class DDPSpawnAccelerator(Accelerator):
class DDPSpawnAccelerator(Accelerator, NVIDIAMixin):

def __init__(self,
trainer,
Expand All @@ -68,6 +69,10 @@ def __init__(self,

def setup(self, model):
os.environ['MASTER_PORT'] = os.environ.get('MASTER_PORT', str(find_free_network_port()))
# ----------------------------
# NVIDIA FLAGS
# ----------------------------
self.set_nvidia_flags(self.trainer.data_parallel_device_ids)

# pass in a state q
smp = mp.get_context('spawn')
Expand Down
7 changes: 6 additions & 1 deletion pytorch_lightning/accelerators/gpu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

from pytorch_lightning import _logger as log
from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp
from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin
from pytorch_lightning.cluster_environments import ClusterEnvironment
from pytorch_lightning.distributed.dist import LightningDistributed
from pytorch_lightning.utilities import AMPType


class GPUAccelerator(Accelerator):
class GPUAccelerator(Accelerator, NVIDIAMixin):
amp_backend: AMPType

def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] = None):
Expand All @@ -40,6 +41,10 @@ def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] =
self.nickname = None

def setup(self, model):
# ----------------------------
# NVIDIA FLAGS
# ----------------------------
self.set_nvidia_flags(self.trainer.data_parallel_device_ids)

# call setup
self.trainer.call_setup_hook(model)
Expand Down
Loading