Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LSF support #5102

Merged
merged 41 commits into from
Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
9abe28e
add ClusterEnvironment for LSF systems
ajtritt Dec 3, 2020
f2e44c1
update init file
ajtritt Dec 4, 2020
615f08d
add available cluster environments
ajtritt Dec 4, 2020
86f2fa1
clean up LSFEnvironment
ajtritt Dec 9, 2020
b72b42d
add ddp_hpc as a distributed backend
ajtritt Dec 9, 2020
6a9a4ca
clean up SLURMEnvironment
ajtritt Dec 9, 2020
5bbba77
Merge branch 'master' into lsf_env
ajtritt Dec 9, 2020
94e4d4b
remove extra blank line
ajtritt Dec 9, 2020
113e787
init device for DDPHPCAccelerator
ajtritt Dec 10, 2020
d12d652
committing current state
ajtritt Dec 11, 2020
d0ac793
Merge branch 'master' into lsf_env
ajtritt Dec 11, 2020
b53d153
add additional methods to ClusterEnvironments
ajtritt Dec 11, 2020
0b6edfe
add NVIDIA mixin for setting up CUDA envars
ajtritt Dec 11, 2020
f7d87f6
remove troubleshooting prints
ajtritt Dec 12, 2020
3c9edf9
cleanup SLURMEnvironment
ajtritt Dec 12, 2020
77f3b71
fix docstring
ajtritt Dec 12, 2020
eb7d07c
cleanup TorchElasticEnvironment and add documentation
ajtritt Dec 12, 2020
09064e1
PEP8 puts a cork in it
ajtritt Dec 12, 2020
fb30942
Merge branch 'master' into lsf_env
ajtritt Dec 12, 2020
7be8f1d
add set_ranks_to_trainer
ajtritt Feb 11, 2021
5c04b8e
Merge remote-tracking branch 'pl/master' into lsf_env
ajtritt Feb 12, 2021
004daef
Merge remote-tracking branch 'pl/master' into lsf_env
ajtritt Feb 12, 2021
a113210
remove unused import
ajtritt Feb 12, 2021
d17281c
move to new location
ajtritt Feb 12, 2021
b4028a7
Merge branch 'master' into lsf_env
awaelchli Jul 9, 2021
7a23376
update LSF environment
awaelchli Jul 9, 2021
02410ff
remove mixin
awaelchli Jul 9, 2021
7f91740
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 9, 2021
1b3bc7a
changelog
awaelchli Jul 9, 2021
5ec0e9f
Merge remote-tracking branch 'ajtritt/lsf_env' into lsf_env
awaelchli Jul 9, 2021
92215ab
reset slurm env
awaelchli Jul 9, 2021
a613759
add tests
awaelchli Jul 9, 2021
f7c5e0e
add licence
awaelchli Jul 9, 2021
00de88e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 9, 2021
cfd59b8
test node_rank
awaelchli Jul 9, 2021
5ec99e9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 9, 2021
cfe544f
add lsf env to docs
awaelchli Jul 9, 2021
71569de
add auto detection for lsf environment
awaelchli Jul 9, 2021
7c26b41
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 9, 2021
077964d
fix is_using_lsf() and test
awaelchli Jul 9, 2021
7f127c8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions pytorch_lightning/accelerators/accelerator_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ def __init__(
self.interactive_ddp_procs = []
self.global_rank = 0

# NVIDIA setup
# self.set_nvidia_flags(self.trainer.is_slurm_managing_tasks, self.trainer.data_parallel_device_ids)

# benchmarking
# TODO: should this be moved to GPU accelerator?
torch.backends.cudnn.benchmark = self.benchmark
Expand Down
3 changes: 1 addition & 2 deletions pytorch_lightning/accelerators/legacy/ddp2_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ def ddp_train(self, process_idx, mp_queue, model):
model.trainer = self.trainer
self.init_ddp_connection(
self.trainer.global_rank,
self.trainer.world_size,
self.trainer.is_slurm_managing_tasks
self.trainer.world_size
)

if isinstance(self.ddp_plugin, RPCPlugin):
Expand Down
3 changes: 1 addition & 2 deletions pytorch_lightning/accelerators/legacy/ddp_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,7 @@ def ddp_train(self, process_idx, model):
model.trainer = self.trainer
self.init_ddp_connection(
self.trainer.global_rank,
self.trainer.world_size,
self.trainer.is_slurm_managing_tasks
self.trainer.world_size
)

if isinstance(self.ddp_plugin, RPCPlugin):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,7 @@ def set_world_ranks(self, process_idx):
self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx
self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes

def model_to_device(self, model, process_idx):
# Todo: required argument `process_idx` is not used
def model_to_device(self, model):
model.cpu()

def get_device_ids(self):
Expand Down
15 changes: 11 additions & 4 deletions pytorch_lightning/accelerators/legacy/ddp_hpc_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from torch.nn.parallel import DistributedDataParallel

from pytorch_lightning import _logger as log
from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin
from pytorch_lightning.accelerators.legacy.accelerator import Accelerator, ReduceOp
from pytorch_lightning.core.lightning import LightningModule
from pytorch_lightning.distributed.dist import LightningDistributed
Expand All @@ -29,7 +30,7 @@
from pytorch_lightning.utilities.distributed import all_gather_ddp_if_available, rank_zero_only, sync_ddp_if_available


class DDPHPCAccelerator(Accelerator):
class DDPHPCAccelerator(Accelerator, NVIDIAMixin):

def __init__(self,
trainer,
Expand All @@ -52,6 +53,10 @@ def __init__(self,

def setup(self, model):
self.trainer.model = model
# ----------------------------
# NVIDIA FLAGS
# ----------------------------
self.set_nvidia_flags(self.trainer.data_parallel_device_ids)
self.task_idx = self.cluster_environment.local_rank()

def train(self):
Expand All @@ -60,7 +65,7 @@ def train(self):

def set_world_ranks(self, process_idx):
self.trainer.local_rank = process_idx
self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx
self.trainer.global_rank = self.cluster_environment.node_rank() * self.trainer.num_processes + process_idx
self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes

def init_device(self, process_idx):
Expand Down Expand Up @@ -132,14 +137,16 @@ def ddp_train(self, process_idx, model):
# set warning rank
rank_zero_only.rank = self.trainer.global_rank

# Initialize cuda device
self.init_device(process_idx)

# set up server using proc 0's ip address
# try to init for 20 times at max in case ports are taken
# where to store ip_table
model.trainer = self.trainer
self.init_ddp_connection(
self.trainer.global_rank,
self.trainer.world_size,
self.trainer.is_slurm_managing_tasks
self.trainer.world_size
)

if isinstance(self.ddp_plugin, RPCPlugin):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from torch.nn.parallel import DistributedDataParallel

from pytorch_lightning import _logger as log
from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin
from pytorch_lightning.accelerators.legacy.accelerator import Accelerator, ReduceOp
from pytorch_lightning.core.lightning import LightningModule
from pytorch_lightning.distributed import LightningDistributed
Expand All @@ -40,7 +41,7 @@
from pytorch_lightning.utilities.seed import seed_everything


class DDPSpawnAccelerator(Accelerator):
class DDPSpawnAccelerator(Accelerator, NVIDIAMixin):

def __init__(self,
trainer,
Expand All @@ -64,6 +65,10 @@ def __init__(self,

def setup(self, model):
os.environ['MASTER_PORT'] = os.environ.get('MASTER_PORT', str(find_free_network_port()))
# ----------------------------
# NVIDIA FLAGS
# ----------------------------
self.set_nvidia_flags(self.trainer.data_parallel_device_ids)

# pass in a state q
smp = mp.get_context('spawn')
Expand Down
7 changes: 6 additions & 1 deletion pytorch_lightning/accelerators/legacy/gpu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@

import torch

from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin
from pytorch_lightning.accelerators.legacy.accelerator import Accelerator, ReduceOp
from pytorch_lightning.distributed.dist import LightningDistributed
from pytorch_lightning.plugins.environments import ClusterEnvironment
from pytorch_lightning.utilities import AMPType


class GPUAccelerator(Accelerator):
class GPUAccelerator(Accelerator, NVIDIAMixin):
amp_backend: AMPType

def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] = None):
Expand All @@ -39,6 +40,10 @@ def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] =
self.nickname = None

def setup(self, model):
# ----------------------------
# NVIDIA FLAGS
# ----------------------------
self.set_nvidia_flags(self.trainer.data_parallel_device_ids)

# call setup
self.trainer.call_setup_hook(model)
Expand Down
31 changes: 31 additions & 0 deletions pytorch_lightning/accelerators/nvidia_mixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright The PyTorch Lightning team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
import os

import torch

from pytorch_lightning import _logger as log


class NVIDIAMixin:
awaelchli marked this conversation as resolved.
Show resolved Hide resolved

def set_nvidia_flags(self, data_parallel_device_ids):
if data_parallel_device_ids is None:
return

# set the correct cuda visible devices (using pci order)
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
all_gpu_ids = ",".join([str(x) for x in range(torch.cuda.device_count())])
devices = os.environ.get("CUDA_VISIBLE_DEVICES", all_gpu_ids)
log.info(f'LOCAL_RANK: {self.trainer.local_rank} - CUDA_VISIBLE_DEVICES: [{devices}]')
11 changes: 11 additions & 0 deletions pytorch_lightning/plugins/environments/cluster_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,14 @@ def world_size(self):

def local_rank(self):
pass

def global_rank(self):
pass

def node_rank(self):
pass

def set_ranks_to_trainer(self):
trainer.local_rank = self.accelerator_backend.cluster_environment.local_rank()
trainer.node_rank = self.accelerator_backend.cluster_environment.node_rank()
trainer.global_rank = self.accelerator_backend.cluster_environment.global_rank()
176 changes: 176 additions & 0 deletions pytorch_lightning/plugins/environments/lsf_environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# Copyright The PyTorch Lightning team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import re
import socket
import warnings
from pytorch_lightning import _logger as log
from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment


class LSFEnvironment(ClusterEnvironment):
"""An environment for running on clusters managed by the LSF resource manager.

It is expected that any execution using this ClusterEnvironment was executed
using the Job Step Manager i.e. jsrun.

This plugin expects the following environment variables:

LSB_JOBID
The LSF assigned job ID

LSB_HOSTS
The hosts used in the job. This string is expected to have the format "batch <rank_0_host> ...."

JSM_NAMESPACE_LOCAL_RANK
The node local rank for the task. This environment variable is set by jsrun

JSM_NAMESPACE_SIZE
The world size for the task. This environment variable is set by jsrun
"""

def __init__(self):
self._master_address = self._get_master_address()
self._master_port = self._get_master_port()
self._local_rank = self._get_local_rank()
self._global_rank = self._get_global_rank()
self._world_size = self._get_world_size()
self._node_rank = self._get_node_rank()

# set environment variables needed for initializing torch distributed process group
os.environ["MASTER_ADDR"] = str(self._master_address)
log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}")
os.environ["MASTER_PORT"] = str(self._master_port)
log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}")

def _read_hosts(self):
var = "LSB_HOSTS"
hosts = os.environ.get(var)
if not hosts:
raise ValueError("Could not find hosts -- expected in environment variable %s" % var)
hosts = hosts.split()
if len(hosts) < 2:
raise ValueError("Cannot parse hosts from LSB_HOSTS environment variable -- "
"expected format \"batch <rank_0_host> ...\"")
return hosts

def _get_master_address(self):
"""A helper for getting the master address"""
hosts = self._read_hosts()
return hosts[1]

def _get_master_port(self):
"""A helper for getting the master port

Use the LSF job ID so all ranks can compute the master port
"""
# check for user-specified master port
port = os.environ.get("MASTER_PORT")
if not port:
var = "LSB_JOBID"
jobid = os.environ.get(var)
if not jobid:
raise ValueError("Could not find job id -- expected in environment variable %s" % var)
else:
port = int(jobid)
# all ports should be in the 10k+ range
port = int(port) % 1000 + 10000
log.debug("calculated master port")
else:
log.debug("using externally specified master port")
return port

def _get_global_rank(self):
"""A helper function for getting the global rank

Read this from the environment variable JSM_NAMESPACE_LOCAL_RANK
"""
var = "JSM_NAMESPACE_RANK"
global_rank = os.environ.get(var)
if global_rank is None:
raise ValueError("Cannot determine global rank -- expected in %s "
"-- make sure you run your executable with jsrun" % var)
return int(global_rank)

def _get_local_rank(self):
"""A helper function for getting the local rank

Read this from the environment variable JSM_NAMESPACE_LOCAL_RANK
"""
var = "JSM_NAMESPACE_LOCAL_RANK"
local_rank = os.environ.get(var)
if local_rank is None:
raise ValueError("Cannot determine local rank -- expected in %s "
"-- make sure you run your executable with jsrun" % var)
return int(local_rank)

def _get_world_size(self):
"""A helper function for getting the world size

Read this from the environment variable JSM_NAMESPACE_SIZE
"""
var = "JSM_NAMESPACE_SIZE"
world_size = os.environ.get(var)
if world_size is None:
raise ValueError("Cannot determine local rank -- expected in %s "
"-- make sure you run your executable with jsrun" % var)
return int(world_size)

def _get_node_rank(self):
"""A helper function for getting the node rank"""
hosts = self._read_hosts()
count = dict()
for host in hosts:
if 'batch' in host or 'login' in host:
continue
if host not in count:
count[host] = len(count)
return count[socket.gethostname()]

def master_address(self):
"""
Master address is read from a list of hosts contained in the environment variable *LSB_HOSTS*
"""
return self._master_address

def master_port(self):
"""
Master port is calculated from the LSF job ID
"""
return self._master_port

def world_size(self):
"""
World size is read from the environment variable JSM_NAMESPACE_SIZE
"""
return self._world_size

def local_rank(self):
"""
World size is read from the environment variable JSM_NAMESPACE_LOCAL_RANK
"""
return self._local_rank

def node_rank(self):
"""
Node rank is determined by the position of the current hostname in the list of hosts stored in LSB_HOSTS
"""
return self._node_rank

def global_rank(self):
"""
World size is read from the environment variable JSM_NAMESPACE_RANK
"""
return self._global_rank
Loading