Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-node training support #440

Merged
merged 39 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
3552f00
Fix temporary directory creation for multi-node setting
michaelbenayoun Jan 25, 2024
fba21f2
[WIP] llama-70b
michaelbenayoun Feb 1, 2024
861c782
[WIP] llama-70b
michaelbenayoun Feb 1, 2024
eaae663
[WIP] llama-70b
michaelbenayoun Feb 2, 2024
6eeeaa0
[WIP] llama-70b
michaelbenayoun Feb 5, 2024
3d99397
[WIP] llama-70b
michaelbenayoun Feb 6, 2024
dd49c38
[WIP] llama-70b
michaelbenayoun Feb 6, 2024
75a3f91
[WIP] llama-70b
michaelbenayoun Feb 7, 2024
c4d50b1
[WIP] llama-70b
michaelbenayoun Feb 7, 2024
f734478
[WIP] llama-70b
michaelbenayoun Feb 7, 2024
c21286f
[WIP] llama-70b
michaelbenayoun Feb 7, 2024
13c99a4
[WIP] llama-70b
michaelbenayoun Feb 8, 2024
11d1aed
[WIP] llama-70b
michaelbenayoun Feb 9, 2024
bb49603
[WIP] llama-70b
michaelbenayoun Feb 15, 2024
f5eddc4
[WIP] llama-70b
michaelbenayoun Feb 16, 2024
0aac6a4
Merge branch 'main' into multi_node_training
michaelbenayoun Feb 16, 2024
3574be6
Change all_reduce method (not working but might be running)
michaelbenayoun Feb 16, 2024
e8e82f8
Initialize linears later
michaelbenayoun Feb 16, 2024
b06d110
[WIP] llama-70b
michaelbenayoun Feb 19, 2024
0a591f4
Fix for Llama
michaelbenayoun Feb 19, 2024
33c5136
[WIP] llama-70b
michaelbenayoun Feb 20, 2024
3b1d235
[WIP] llama-70b
michaelbenayoun Feb 20, 2024
9a73124
[WIP] llama-70b
michaelbenayoun Feb 23, 2024
8c22562
[WIP] llama-70b
michaelbenayoun Feb 23, 2024
746fdf0
Fix cache system with custom cache dir
michaelbenayoun Feb 26, 2024
bcde7f7
Fix metrics logging and saving
michaelbenayoun Feb 26, 2024
f0c4cd2
Add the possibility to add keyword arguments
michaelbenayoun Feb 26, 2024
b97f677
Styling
michaelbenayoun Feb 26, 2024
2109b3a
Fix
michaelbenayoun Feb 26, 2024
5ac76e6
Fix
michaelbenayoun Feb 26, 2024
4db5a78
Fix
michaelbenayoun Feb 26, 2024
eda9151
Merge branch 'main' into multi_node_training
michaelbenayoun Feb 26, 2024
9e85d5b
Styling
michaelbenayoun Feb 26, 2024
aee49b6
Apply suggestions
michaelbenayoun Feb 29, 2024
3d5251b
Apply suggestions
michaelbenayoun Feb 29, 2024
adbfe54
Fix docstring
michaelbenayoun Feb 29, 2024
bc4c851
Apply changes
michaelbenayoun Feb 29, 2024
a6451dc
Fix minor issue
michaelbenayoun Feb 29, 2024
f3a32bc
Final fixes
michaelbenayoun Feb 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion benchmark/text-generation/llama2-13b.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from tempfile import TemporaryDirectory

from transformers import AutoTokenizer
Expand Down
7 changes: 4 additions & 3 deletions examples/language-modeling/run_clm.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,10 @@ def main():

# We resize the embeddings only when necessary to avoid index errors. If you are creating a model from scratch
# on a small vocab and want a smaller embedding size, remove this test.
embedding_size = model.get_input_embeddings().weight.shape[0]
if len(tokenizer) > embedding_size:
model.resize_token_embeddings(len(tokenizer))
# TODO: uncomment that.
# embedding_size = model.get_input_embeddings().weight.shape[0]
# if len(tokenizer) > embedding_size:
# model.resize_token_embeddings(len(tokenizer))

# Preprocessing the datasets.
# First we tokenize all the texts.
Expand Down
7 changes: 6 additions & 1 deletion optimum/commands/neuron/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
create_custom_cache_repo,
set_custom_cache_repo_name_in_hf_home,
)
from ...neuron.utils.require_utils import requires_torch_neuronx
from ...neuron.utils.runner import ExampleRunner
from ...utils import logging
from ..base import BaseOptimumCLICommand, CommandInfo
Expand Down Expand Up @@ -165,9 +166,13 @@ class SynchronizeRepoCommand(BaseOptimumCLICommand):
@staticmethod
def parse_args(parser: "ArgumentParser"):
parser.add_argument("--repo_id", type=str, default=None, help="The name of the repo to use as remote cache.")
parser.add_argument(
"--cache_dir", type=str, default=None, help="The cache directory that contains the compilation files."
)

@requires_torch_neuronx
def run(self):
synchronize_hub_cache(self.args.repo_id)
synchronize_hub_cache(cache_path=self.args.cache_dir, cache_repo_id=self.args.repo_id)


class LookupRepoCommand(BaseOptimumCLICommand):
Expand Down
25 changes: 21 additions & 4 deletions optimum/neuron/accelerate/accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
patch_within_function,
patched_finfo,
)
from ..utils.misc import args_and_kwargs_to_kwargs_only
from ..utils.misc import args_and_kwargs_to_kwargs_only, is_main_worker
from ..utils.require_utils import requires_neuronx_distributed, requires_torch_xla
from .optimizer import NeuronAcceleratedOptimizer
from .scheduler import NeuronAcceleratedScheduler
Expand Down Expand Up @@ -173,7 +173,11 @@ def __init__(self, *args, mp_plugin: Optional[ModelParallelismPlugin] = None, ze
self.gradient_accumulation_steps = num_steps

def _prepare_data_loader_for_distributed(
self, data_loader: DataLoader, num_replicas: int, rank: int
self,
data_loader: DataLoader,
num_replicas: int,
rank: int,
force_drop_last: bool,
) -> DataLoader:
# TODO: make it more robust, similar to the prepare_data_loader function in `accelerate`.
if isinstance(data_loader.sampler, DistributedSampler):
Expand Down Expand Up @@ -201,22 +205,32 @@ def _prepare_data_loader_for_distributed(
num_workers=data_loader.num_workers,
collate_fn=data_loader.collate_fn,
pin_memory=data_loader.pin_memory,
drop_last=data_loader.drop_last,
drop_last=data_loader.drop_last or force_drop_last,
)

distributed_dataloader._is_accelerate_prepared = True
return distributed_dataloader

def prepare_data_loader(self, data_loader: DataLoader, device_placement: Optional[bool] = None):
force_drop_last = False
if self.state.distributed_type is NeuronDistributedType.MODEL_PARALLELISM:
from neuronx_distributed import parallel_layers

num_replicas = parallel_layers.parallel_state.get_data_parallel_size()
rank = parallel_layers.parallel_state.get_data_parallel_rank()
force_drop_last = parallel_layers.parallel_state.get_pipeline_model_parallel_size() > 1
if is_main_worker() and force_drop_last:
logger.warning(
"Pipeline parallelsim: forcing the dataloader to drop the last incomplete batch because it can "
"cause failure if the last batch size is not divisible by the number of microbatches for the pipeline."
)
else:
num_replicas = xm.xrt_world_size()
rank = xm.get_ordinal()
if self.state.num_processes > 1:
data_loader = self._prepare_data_loader_for_distributed(data_loader, num_replicas=num_replicas, rank=rank)
data_loader = self._prepare_data_loader_for_distributed(
data_loader, num_replicas=num_replicas, rank=rank, force_drop_last=force_drop_last
)
# No need to wrap the dataloader if we are using pipeline parallelism.
if self.state.mp_plugin.pipeline_parallel_size == 1:
data_loader = MpDeviceLoader(data_loader, self.device)
Expand Down Expand Up @@ -471,6 +485,9 @@ def prepare_model(

model = self.patch_model_for_neuron(model)

# We do not want to use the cache here as it would imply more communication that we do not need.
model.config.use_cache = False

if self.distributed_type is NeuronDistributedType.XLA_FSDP:
return self.prepare_model_for_xla_fsdp(
model, device_placement=device_placement, evaluation_mode=evaluation_mode
Expand Down
21 changes: 8 additions & 13 deletions optimum/neuron/accelerate/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,23 +267,11 @@ def __init__(
os.environ.get("ACCELERATE_USE_NEURONX_DISTRIBUTED_TP", "false") == "true"
or os.environ.get("ACCELERATE_USE_NEURONX_DISTRIBUTED_PP", "false") == "true"
):
if not is_neuronx_distributed_available():
raise RuntimeError(
"Model parallelism requires the neuronx_distributed package. You can install it by "
"running: python -m pip install neuronx_distributed --extra-index-url "
"https://pip.repos.neuron.amazonaws.com"
)
if mp_plugin is None:
raise ValueError(
"Could not initialize `neuronx_distributed` model parallelism because no "
"`ModelParallelismPlugin` was provided."
"Could not initialize model parallelism because no `ModelParallelismPlugin` was provided."
)
if mp_plugin.should_parallelize:
if not parallel_state.model_parallel_is_initialized():
parallel_state.initialize_model_parallel(
tensor_model_parallel_size=mp_plugin.tensor_parallel_size,
pipeline_model_parallel_size=mp_plugin.pipeline_parallel_size,
)
self.distributed_type = NeuronDistributedType.MODEL_PARALLELISM
else:
logger.warning(
Expand All @@ -293,6 +281,13 @@ def __init__(
self.mp_plugin = mp_plugin
else:
self.mp_plugin = ModelParallelismPlugin()

if torch.distributed.is_initialized() and not parallel_state.model_parallel_is_initialized():
parallel_state.initialize_model_parallel(
tensor_model_parallel_size=self.mp_plugin.tensor_parallel_size,
pipeline_model_parallel_size=self.mp_plugin.pipeline_parallel_size,
)

if os.environ.get("ACCELERATE_USE_FSDP", "false") == "true":
self.distributed_type = NeuronDistributedType.XLA_FSDP
if self._mixed_precision != "no":
Expand Down
2 changes: 2 additions & 0 deletions optimum/neuron/accelerate/utils/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class ModelParallelismPlugin:
pipeline_parallel_size: int = 1
pipeline_parallel_num_microbatches: int = 1
pipeline_parallel_use_zero1_optimizer: bool = False
gradient_checkpointing: bool = False
checkpoint_dir: Optional[Union[str, Path]] = None

def __post_init__(self):
Expand Down Expand Up @@ -176,6 +177,7 @@ def parallelize_model(
sequence_parallel_enabled=self.sequence_parallel_enabled,
pipeline_parallel_num_microbatches=self.pipeline_parallel_num_microbatches,
pipeline_parallel_use_zero1_optimizer=self.pipeline_parallel_use_zero1_optimizer,
pipeline_parallel_gradient_checkpointing_enabled=self.gradient_checkpointing,
checkpoint_dir=self.checkpoint_dir,
)
return parallelized_model
Loading
Loading