From 05db815fe569e1b7831a0da8523af6ebb8afe65d Mon Sep 17 00:00:00 2001 From: Anna Shors <71393111+ashors1@users.noreply.github.com> Date: Mon, 15 Jul 2024 16:42:25 -0700 Subject: [PATCH] [NeMo-UX] Make TE and Apex dependencies optional (#9732) * [NeMo-UX] Make TE and Apex dependencies optional (#9550) * Provide a pure pytorch/jit path to avoid required dependency on TE and Apex Signed-off-by: ashors1 * add missing file Signed-off-by: ashors1 * add minimal gpt pretraining example Signed-off-by: ashors1 * fix pre-training datamodule initialization Signed-off-by: ashors1 * add non-te/non-apex test Signed-off-by: ashors1 * add comment to pretraining script Signed-off-by: ashors1 * use microbatch calculator from mcore Signed-off-by: ashors1 * Apply isort and black reformatting Signed-off-by: ashors1 * fix nemo 2 test name Signed-off-by: ashors1 * update Mcore commit for CI Signed-off-by: ashors1 * replace apex microbatch calculator with megatron's in more places Signed-off-by: ashors1 * Apply isort and black reformatting Signed-off-by: ashors1 * fix missing import Signed-off-by: ashors1 * fix typo Signed-off-by: ashors1 * fix missed apex import Signed-off-by: ashors1 * Apply isort and black reformatting Signed-off-by: ashors1 Signed-off-by: ashors1 * move imports Signed-off-by: ashors1 * Apply isort and black reformatting Signed-off-by: ashors1 Signed-off-by: ashors1 * move imports Signed-off-by: ashors1 * Apply isort and black reformatting Signed-off-by: ashors1 * add types to command-line args Signed-off-by: ashors1 * bug fix Signed-off-by: ashors1 * fix path Signed-off-by: ashors1 * Disable distributed optimizer in nemo 2.0 test Signed-off-by: ashors1 * Apply isort and black reformatting Signed-off-by: ashors1 * fix optimizer config Signed-off-by: ashors1 * update checkpointing Signed-off-by: ashors1 * move import Signed-off-by: ashors1 * Apply isort and black reformatting Signed-off-by: ashors1 * fix failing unit test Signed-off-by: ashors1 * fix failing test Signed-off-by: ashors1 * Apply isort and black reformatting Signed-off-by: ashors1 * Updating num_weights check of RETRO due to underlying changes from mcore RETRO MLM Signed-off-by: huvunvidia <86480512+huvunvidia@users.noreply.github.com> * Apply isort and black reformatting Signed-off-by: huvunvidia * fix typo Signed-off-by: ashors1 * Apply isort and black reformatting Signed-off-by: ashors1 * remove stale warning Signed-off-by: ashors1 * fix lora notebook Signed-off-by: ashors1 * fix small typo Signed-off-by: ashors1 * add import guards to gemma2 Signed-off-by: ashors1 * Apply isort and black reformatting Signed-off-by: ashors1 --------- Signed-off-by: ashors1 Signed-off-by: ashors1 Signed-off-by: huvunvidia <86480512+huvunvidia@users.noreply.github.com> Signed-off-by: huvunvidia Co-authored-by: ashors1 Co-authored-by: Eric Harper Co-authored-by: huvunvidia <86480512+huvunvidia@users.noreply.github.com> Co-authored-by: huvunvidia * fix cherry-pick Signed-off-by: ashors1 * Apply isort and black reformatting Signed-off-by: ashors1 --------- Signed-off-by: ashors1 Signed-off-by: ashors1 Signed-off-by: huvunvidia <86480512+huvunvidia@users.noreply.github.com> Signed-off-by: huvunvidia Co-authored-by: ashors1 Co-authored-by: Eric Harper Co-authored-by: huvunvidia <86480512+huvunvidia@users.noreply.github.com> Co-authored-by: huvunvidia Signed-off-by: Tugrul Konuk --- .github/workflows/cicd-main.yml | 43 +++++ Dockerfile.ci | 2 +- examples/llm/megatron_gpt_pretraining.py | 109 +++++++++++ .../nmt_transformer_infer_megatron.py | 19 +- nemo/collections/llm/gpt/data/pre_training.py | 6 +- nemo/collections/llm/gpt/model/base.py | 15 +- .../models/multimodal_llm/neva/neva_model.py | 11 +- .../text_to_image/controlnet/controlnet.py | 102 ++++++----- .../text_to_image/dreambooth/dreambooth.py | 58 +++--- .../models/text_to_image/imagen/imagen.py | 67 ++++--- .../stable_diffusion/diffusion_engine.py | 2 +- .../stable_diffusion/ldm/ddpm.py | 171 +++++++++++------- .../clip/megatron_clip_models.py | 2 +- .../megatron_nsfw_clip_models.py | 2 +- .../speech_llm/models/modular_models.py | 9 +- .../speech_llm/models/modular_t5_models.py | 15 +- .../common/audio_text_generation_utils.py | 10 +- .../megatron/retro_dataset.py | 102 ++++++----- .../dialogue/dialogue_s2s_generation_model.py | 9 +- .../megatron_bert_embedding_model.py | 22 +-- .../megatron/gpt_layer_modelopt_spec.py | 2 + .../language_modeling/megatron_base_model.py | 13 +- .../megatron_base_prompt_learning_model.py | 9 +- .../language_modeling/megatron_bert_model.py | 89 +++++---- .../language_modeling/megatron_gpt_model.py | 27 +-- .../megatron_gpt_prompt_learning_model.py | 10 +- .../megatron_gpt_sft_model.py | 18 +- .../megatron_lm_encoder_decoder_model.py | 18 +- .../language_modeling/megatron_retro_model.py | 12 +- .../language_modeling/megatron_t0_model.py | 26 +-- .../megatron_t5_prompt_learning_model.py | 78 ++++---- .../megatron_t5_sft_model.py | 40 ++-- .../machine_translation/megatron_nmt_model.py | 72 ++++---- .../modules/common/megatron/megatron_init.py | 16 +- .../modules/common/megatron/transformer.py | 2 +- .../common/text_generation_strategy.py | 9 +- .../modules/common/text_generation_utils.py | 10 +- nemo/collections/nlp/parts/nlp_overrides.py | 2 +- .../megatron_vit_classification_models.py | 79 ++++---- nemo/lightning/data.py | 10 +- nemo/lightning/megatron_parallel.py | 1 + nemo/lightning/pytorch/optim/megatron.py | 7 +- .../lightning/pytorch/plugins/data_sampler.py | 16 +- nemo/utils/apex_utils.py | 25 +++ .../collections/nlp/test_rampup_batch_size.py | 20 +- tutorials/nlp/lora.ipynb | 2 +- 46 files changed, 762 insertions(+), 627 deletions(-) create mode 100644 examples/llm/megatron_gpt_pretraining.py create mode 100644 nemo/utils/apex_utils.py diff --git a/.github/workflows/cicd-main.yml b/.github/workflows/cicd-main.yml index 6ae11032d0a3..46182745e52d 100644 --- a/.github/workflows/cicd-main.yml +++ b/.github/workflows/cicd-main.yml @@ -4516,6 +4516,48 @@ jobs: AFTER_SCRIPT: | rm -rf examples/multimodal/text_to_image/sd_train_results + L2_NeMo_2_GPT_Pretraining_no_transformer_engine: + needs: [cicd-test-container-setup] + runs-on: self-hosted-azure + timeout-minutes: 10 + container: + image: nemoci.azurecr.io/nemo_container_${{ github.run_id }} + options: + --device=/dev/nvidia0 + --gpus all + --shm-size=8g + --env TRANSFORMERS_OFFLINE=0 + --volume /mnt/datadrive/TestData:/home/TestData + steps: + - name: Checkout repository + uses: actions/checkout@v4 + - run: | + pip uninstall -y apex ## TODO: remove when apex is no longer a dependency + pip uninstall -y transformer_engine + + python examples/llm/megatron_gpt_pretraining.py \ + --devices=2 \ + --max-steps=3 \ + --experiment-dir=examples/llm/gpt_pretrain_results \ + --vocab-path=/home/TestData/nlp/megatron_gpt/data/gpt/vocab.json \ + --merges-path=/home/TestData/nlp/megatron_gpt/data/gpt/merges.txt \ + --data-path=/home/TestData/nlp/megatron_gpt/data/gpt/simple_wiki_gpt_preproc_text_document \ + --index-mapping-dir=examples/llm/gpt_index_mappings + + python examples/llm/megatron_gpt_pretraining.py \ + --devices=2 \ + --max-steps=6 \ + --experiment-dir=examples/llm/gpt_pretrain_results \ + --vocab-path=/home/TestData/nlp/megatron_gpt/data/gpt/vocab.json \ + --merges-path=/home/TestData/nlp/megatron_gpt/data/gpt/merges.txt \ + --data-path=/home/TestData/nlp/megatron_gpt/data/gpt/simple_wiki_gpt_preproc_text_document \ + --index-mapping-dir=examples/llm/gpt_index_mappings + + rm -rf examples/llm/gpt_pretrain_results + rm -rf examples/llm/gpt_index_mappings + - uses: "NVIDIA/NeMo/.github/actions/cancel-workflow@main" + if: "failure()" + Nemo_CICD_Test: needs: - L0_Unit_Tests_GPU @@ -4616,6 +4658,7 @@ jobs: - L2_TTS_Fast_dev_runs_1_Hifigan - Speech_Checkpoints_tests - L2_Stable_Diffusion_Training + - L2_NeMo_2_GPT_Pretraining_no_transformer_engine if: always() runs-on: ubuntu-latest steps: diff --git a/Dockerfile.ci b/Dockerfile.ci index 2a7006c057f1..15cd016073ca 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -34,7 +34,7 @@ WORKDIR /workspace # Install NeMo requirements ARG TE_TAG=7d576ed25266a17a7b651f2c12e8498f67e0baea ARG MODELOPT_VERSION=0.13.0 -ARG MCORE_TAG=c0164bcfd4f8213a10a6b1e47ef80721a68b4fb6 +ARG MCORE_TAG=c7a1f82d761577e6ca0338d3521eac82f2aa0904 ARG APEX_TAG=810ffae374a2b9cb4b5c5e28eaeca7d7998fca0c RUN \ --mount=type=bind,source=requirements,target=requirements \ diff --git a/examples/llm/megatron_gpt_pretraining.py b/examples/llm/megatron_gpt_pretraining.py new file mode 100644 index 000000000000..a88e01ba5dda --- /dev/null +++ b/examples/llm/megatron_gpt_pretraining.py @@ -0,0 +1,109 @@ +## NOTE: This script is present for github-actions testing only. +## There are no guarantees that this script is up-to-date with latest NeMo. + +import argparse + +from megatron.core.optimizer import OptimizerConfig +from pytorch_lightning.loggers import TensorBoardLogger + +from nemo import lightning as nl +from nemo.collections import llm +from nemo.collections.llm.api import train +from nemo.collections.llm.gpt.data import PreTrainingDataModule +from nemo.collections.nlp.modules.common.tokenizer_utils import get_nmt_tokenizer +from nemo.lightning import NeMoLogger +from nemo.lightning.pytorch.callbacks import ModelCheckpoint +from nemo.lightning.pytorch.optim.megatron import MegatronOptimizerModule + + +def get_args(): + parser = argparse.ArgumentParser(description='Train a small GPT model using NeMo 2.0') + parser.add_argument('--devices', type=int, help="Number of devices to use for training") + parser.add_argument('--max-steps', type=int, help="Number of steps to train for") + parser.add_argument('--experiment-dir', type=str, help="directory to write results and checkpoints to") + parser.add_argument('--data-path', type=str, help="Path to data file") + parser.add_argument('--vocab-path', type=str, help="Path to vocab file") + parser.add_argument('--merges-path', type=str, help="Path to merges file") + parser.add_argument('--index-mapping-dir', type=str, help="directory to write index mappings to") + + return parser.parse_args() + + +if __name__ == '__main__': + + args = get_args() + + seq_length = 2048 + + tokenizer = get_nmt_tokenizer( + "megatron", + "GPT2BPETokenizer", + vocab_file=args.vocab_path, + merges_file=args.merges_path, + ) + data = PreTrainingDataModule( + paths=args.data_path, + seq_length=2048, + global_batch_size=32, + seed=1234, + tokenizer=tokenizer, + ) + gpt_config = llm.GPTConfig( + num_layers=12, + hidden_size=768, + ffn_hidden_size=3072, + num_attention_heads=12, + seq_length=seq_length, + init_method_std=0.023, + hidden_dropout=0.1, + attention_dropout=0.1, + layernorm_epsilon=1e-5, + make_vocab_size_divisible_by=128, + ) + model = llm.GPTModel(gpt_config, tokenizer=data.tokenizer) + strategy = nl.MegatronStrategy() + checkpoint_callback = ModelCheckpoint( + every_n_train_steps=5000, + enable_nemo_ckpt_io=False, + async_save=False, + ) + callbacks = [checkpoint_callback] + + loggers = [] + tensorboard_logger = TensorBoardLogger( + save_dir='dummy', ## NOTE: this gets overwritten by default + ) + loggers.append(tensorboard_logger) + + opt_config = OptimizerConfig( + optimizer='adam', + lr=6e-4, + min_lr=6e-5, + use_distributed_optimizer=False, + bf16=True, + ) + opt = MegatronOptimizerModule(config=opt_config) + + trainer = nl.Trainer( + devices=args.devices, + max_steps=args.max_steps, + accelerator="gpu", + strategy=strategy, + logger=loggers, + callbacks=callbacks, + log_every_n_steps=1, + plugins=nl.MegatronMixedPrecision(precision="bf16-mixed", amp_O2=False), + ) + + nemo_logger = NeMoLogger( + dir=args.experiment_dir, + ) + + train( + model=model, + data=data, + trainer=trainer, + log=nemo_logger, + tokenizer='data', + optim=opt, + ) diff --git a/examples/nlp/machine_translation/nmt_transformer_infer_megatron.py b/examples/nlp/machine_translation/nmt_transformer_infer_megatron.py index c8ab668fc16c..fcf1fb8d1796 100644 --- a/examples/nlp/machine_translation/nmt_transformer_infer_megatron.py +++ b/examples/nlp/machine_translation/nmt_transformer_infer_megatron.py @@ -29,21 +29,12 @@ from nemo.collections.nlp.models.machine_translation.megatron_nmt_model import MegatronNMTModel from nemo.collections.nlp.modules.common.megatron.megatron_init import fake_initialize_model_parallel -from nemo.collections.nlp.modules.common.megatron.utils import ApexGuardDefaults from nemo.collections.nlp.parts.nlp_overrides import NLPDDPStrategy, NLPSaveRestoreConnector from nemo.core.config import hydra_runner from nemo.utils import logging from nemo.utils.app_state import AppState from nemo.utils.model_utils import inject_model_parallel_rank -try: - from apex.transformer.pipeline_parallel.utils import _reconfigure_microbatch_calculator - - HAVE_APEX = True -except (ImportError, ModuleNotFoundError): - ModelType = ApexGuardDefaults() - HAVE_APEX = False - @hydra_runner(config_path="conf", config_name="nmt_megatron_infer") def main(cfg) -> None: @@ -101,13 +92,19 @@ def main(cfg) -> None: src_text.append(line.strip()) if len(src_text) == cfg.batch_size: translations = model.translate( - text=src_text, source_lang=cfg.source_lang, target_lang=cfg.target_lang, + text=src_text, + source_lang=cfg.source_lang, + target_lang=cfg.target_lang, ) for translation in translations: tgt_f.write(translation + "\n") src_text = [] if len(src_text) > 0: - translations = model.translate(text=src_text, source_lang=cfg.source_lang, target_lang=cfg.target_lang,) + translations = model.translate( + text=src_text, + source_lang=cfg.source_lang, + target_lang=cfg.target_lang, + ) for translation in translations: tgt_f.write(translation + "\n") diff --git a/nemo/collections/llm/gpt/data/pre_training.py b/nemo/collections/llm/gpt/data/pre_training.py index 46b407410d31..b405a46f729f 100644 --- a/nemo/collections/llm/gpt/data/pre_training.py +++ b/nemo/collections/llm/gpt/data/pre_training.py @@ -173,10 +173,8 @@ def load_state_dict(self, state_dict: Dict[str, Any]) -> None: state_dict: the datamodule state returned by ``state_dict``. """ - try: - from apex.transformer.pipeline_parallel.utils import _GLOBAL_NUM_MICROBATCHES_CALCULATOR - except ModuleNotFoundError: - from nemo.lightning.apex_utils import _GLOBAL_NUM_MICROBATCHES_CALCULATOR + from megatron.core.num_microbatches_calculator import _GLOBAL_NUM_MICROBATCHES_CALCULATOR + consumed_samples = state_dict['consumed_samples'] self.data_sampler.init_consumed_samples = consumed_samples self.data_sampler.prev_consumed_samples = consumed_samples diff --git a/nemo/collections/llm/gpt/model/base.py b/nemo/collections/llm/gpt/model/base.py index fce268680272..0e4fabe020af 100644 --- a/nemo/collections/llm/gpt/model/base.py +++ b/nemo/collections/llm/gpt/model/base.py @@ -14,6 +14,12 @@ from nemo.lightning.megatron_parallel import MaskedTokenLossReduction from nemo.lightning.pytorch.optim import MegatronOptimizerModule, OptimizerModule +HAVE_TE = True +try: + import transformer_engine +except (ImportError, ModuleNotFoundError): + HAVE_TE = False + if TYPE_CHECKING: from megatron.core.models.gpt.gpt_model import GPTModel as MCoreGPTModel @@ -80,6 +86,13 @@ def local_layer_spec(config: "GPTConfig") -> ModuleSpec: ) +def default_layer_spec(config: "GPTConfig") -> ModuleSpec: + if HAVE_TE: + return transformer_engine_layer_spec(config) + else: + return local_layer_spec(config) + + @dataclass class GPTConfig(TransformerConfig, io.IOMixin): # From megatron.core.models.gpt.gpt_model.GPTModel @@ -96,7 +109,7 @@ class GPTConfig(TransformerConfig, io.IOMixin): # TODO: Move this to better places? get_attention_mask_from_fusion: bool = False - transformer_layer_spec: Union[ModuleSpec, Callable[["GPTConfig"], ModuleSpec]] = transformer_engine_layer_spec + transformer_layer_spec: Union[ModuleSpec, Callable[["GPTConfig"], ModuleSpec]] = default_layer_spec forward_step_fn: Callable = gpt_forward_step data_step_fn: Callable = gpt_data_step diff --git a/nemo/collections/multimodal/models/multimodal_llm/neva/neva_model.py b/nemo/collections/multimodal/models/multimodal_llm/neva/neva_model.py index 92f13c28c287..92066b89c1a1 100644 --- a/nemo/collections/multimodal/models/multimodal_llm/neva/neva_model.py +++ b/nemo/collections/multimodal/models/multimodal_llm/neva/neva_model.py @@ -65,19 +65,10 @@ from nemo.core.classes.common import PretrainedModelInfo from nemo.utils import logging -try: - import apex.transformer.pipeline_parallel.utils - from apex.transformer.pipeline_parallel.utils import get_num_microbatches - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - - HAVE_APEX = False - try: from megatron.core import InferenceParams, dist_checkpointing, parallel_state, tensor_parallel from megatron.core.models.gpt import GPTModel as MCoreGPTModel + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func from megatron.core.transformer.utils import make_sharded_tensors_for_checkpoint diff --git a/nemo/collections/multimodal/models/text_to_image/controlnet/controlnet.py b/nemo/collections/multimodal/models/text_to_image/controlnet/controlnet.py index 3f59eb66c81a..fc661d91ab61 100644 --- a/nemo/collections/multimodal/models/text_to_image/controlnet/controlnet.py +++ b/nemo/collections/multimodal/models/text_to_image/controlnet/controlnet.py @@ -48,7 +48,6 @@ try: from apex import amp from apex.transformer.enums import AttnMaskType - from apex.transformer.pipeline_parallel.utils import get_num_microbatches HAVE_APEX = True except (ImportError, ModuleNotFoundError): @@ -56,6 +55,7 @@ try: from megatron.core import parallel_state + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func HAVE_MEGATRON_CORE = True @@ -380,7 +380,9 @@ def __init__( time_embed_dim = model_channels * 4 self.time_embed = nn.Sequential( - linear(model_channels, time_embed_dim), nn.SiLU(), linear(time_embed_dim, time_embed_dim), + linear(model_channels, time_embed_dim), + nn.SiLU(), + linear(time_embed_dim, time_embed_dim), ) self.input_blocks = nn.ModuleList( @@ -505,24 +507,26 @@ def __init__( use_checkpoint=use_checkpoint, use_scale_shift_norm=use_scale_shift_norm, ), - AttentionBlock( - ch, - use_checkpoint=use_checkpoint, - num_heads=num_heads, - num_head_channels=dim_head, - use_new_attention_order=use_new_attention_order, - ) - if not use_spatial_transformer - else SpatialTransformer( # always uses a self-attn - ch, - num_heads, - dim_head, - depth=transformer_depth, - context_dim=context_dim, - disable_self_attn=disable_middle_self_attn, - use_linear=use_linear_in_transformer, - use_checkpoint=use_checkpoint, - use_flash_attention=use_flash_attention, + ( + AttentionBlock( + ch, + use_checkpoint=use_checkpoint, + num_heads=num_heads, + num_head_channels=dim_head, + use_new_attention_order=use_new_attention_order, + ) + if not use_spatial_transformer + else SpatialTransformer( # always uses a self-attn + ch, + num_heads, + dim_head, + depth=transformer_depth, + context_dim=context_dim, + disable_self_attn=disable_middle_self_attn, + use_linear=use_linear_in_transformer, + use_checkpoint=use_checkpoint, + use_flash_attention=use_flash_attention, + ) ), ResBlock( ch, @@ -684,7 +688,10 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): # handle asynchronous grad reduction no_sync_func = None if not forward_only and self.with_distributed_adam: - no_sync_func = partial(self._optimizer.no_sync, greedy_grad_copy=self.megatron_amp_O2,) + no_sync_func = partial( + self._optimizer.no_sync, + greedy_grad_copy=self.megatron_amp_O2, + ) # pipeline schedules will get these from self.model.config for module in self.get_module_list(): @@ -728,12 +735,12 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): def training_step(self, dataloader_iter): """ - Our dataloaders produce a micro-batch and then we fetch - a number of microbatches depending on the global batch size and model parallel size - from the dataloader to produce a list of microbatches. - Batch should be a list of microbatches and those microbatches should on CPU. - Microbatches are then moved to GPU during the pipeline. - The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. + Our dataloaders produce a micro-batch and then we fetch + a number of microbatches depending on the global batch size and model parallel size + from the dataloader to produce a list of microbatches. + Batch should be a list of microbatches and those microbatches should on CPU. + Microbatches are then moved to GPU during the pipeline. + The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. """ # we zero grads here because we also call backward in the apex fwd/bwd functions self._optimizer.zero_grad() @@ -777,20 +784,20 @@ def training_step(self, dataloader_iter): return loss_mean def backward(self, *args, **kwargs): - """ LightningModule hook to do backward. - We want this to do nothing since we run backward in the fwd/bwd functions from apex. - No need to call it here. + """LightningModule hook to do backward. + We want this to do nothing since we run backward in the fwd/bwd functions from apex. + No need to call it here. """ pass def optimizer_zero_grad(self, *args, **kwargs): - """ LightningModule hook to zero grad. - We want this to do nothing as we are zeroing grads during the training_step. + """LightningModule hook to zero grad. + We want this to do nothing as we are zeroing grads during the training_step. """ pass def _append_sequence_parallel_module_grads(self, module, grads): - """ Helper method for allreduce_sequence_parallel_gradients""" + """Helper method for allreduce_sequence_parallel_gradients""" for param in module.parameters(): sequence_parallel_param = getattr(param, 'sequence_parallel', False) @@ -803,8 +810,8 @@ def _append_sequence_parallel_module_grads(self, module, grads): def get_forward_output_and_loss_func(self): def process_batch(batch): - """ Prepares the global batch for apex fwd/bwd functions. - Global batch is a list of micro batches. + """Prepares the global batch for apex fwd/bwd functions. + Global batch is a list of micro batches. """ # noise_map, condition batch[self.cfg.first_stage_key] = batch[self.cfg.first_stage_key].cuda(non_blocking=True) @@ -814,7 +821,8 @@ def process_batch(batch): # SD has more dedicated structure for encoding, so we enable autocasting here as well with torch.cuda.amp.autocast( - self.autocast_dtype in (torch.half, torch.bfloat16), dtype=self.autocast_dtype, + self.autocast_dtype in (torch.half, torch.bfloat16), + dtype=self.autocast_dtype, ): x, c = self.model.get_input(batch, self.cfg.first_stage_key) @@ -881,7 +889,7 @@ def validation_step(self, batch, batch_idx): self.log_dict(val_loss_dict, prog_bar=False, logger=True, on_step=False, on_epoch=True) def setup(self, stage=None): - """ PTL hook that is executed after DDP spawns. + """PTL hook that is executed after DDP spawns. We setup datasets here as megatron datasets require DDP to instantiate. See https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#setup for more information. Args: @@ -935,7 +943,8 @@ def build_train_valid_test_datasets(self): if self.cfg.first_stage_key.endswith("encoded"): self._train_ds, self._validation_ds = build_train_valid_precached_datasets( - model_cfg=self.cfg, consumed_samples=self.compute_consumed_samples(0), + model_cfg=self.cfg, + consumed_samples=self.compute_consumed_samples(0), ) else: self._train_ds, self._validation_ds = build_train_valid_datasets( @@ -989,20 +998,23 @@ def setup_test_data(self, cfg): f'Setting up test dataloader with len(len(self._test_ds)): {len(self._test_ds)} and consumed samples: {consumed_samples}' ) self._test_dl = torch.utils.data.DataLoader( - self._test_ds, batch_size=self._micro_batch_size, num_workers=cfg.num_workers, pin_memory=True, + self._test_ds, + batch_size=self._micro_batch_size, + num_workers=cfg.num_workers, + pin_memory=True, ) def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_idx: int) -> Any: - """ PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device - When using pipeline parallelism, we need the global batch to remain on the CPU, - since the memory overhead will be too high when using a large number of microbatches. - Microbatches are transferred from CPU to GPU inside the pipeline. + """PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device + When using pipeline parallelism, we need the global batch to remain on the CPU, + since the memory overhead will be too high when using a large number of microbatches. + Microbatches are transferred from CPU to GPU inside the pipeline. """ return batch def _validate_trainer(self): - """ Certain trainer configurations can break training. - Here we try to catch them and raise an error. + """Certain trainer configurations can break training. + Here we try to catch them and raise an error. """ if self.trainer.accumulate_grad_batches > 1: raise ValueError( diff --git a/nemo/collections/multimodal/models/text_to_image/dreambooth/dreambooth.py b/nemo/collections/multimodal/models/text_to_image/dreambooth/dreambooth.py index 0b830ac7319b..9db63c2abfce 100644 --- a/nemo/collections/multimodal/models/text_to_image/dreambooth/dreambooth.py +++ b/nemo/collections/multimodal/models/text_to_image/dreambooth/dreambooth.py @@ -37,7 +37,6 @@ try: from apex import amp from apex.transformer.enums import AttnMaskType - from apex.transformer.pipeline_parallel.utils import get_num_microbatches HAVE_APEX = True except (ImportError, ModuleNotFoundError): @@ -45,6 +44,7 @@ try: from megatron.core import parallel_state + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func HAVE_MEGATRON_CORE = True @@ -99,7 +99,9 @@ def __init__(self, cfg, model_parallel_config): self.get_noise_scheduler(self.cfg.noise_scheduler) self.model_type = None - self.rng = torch.Generator(device=torch.cuda.current_device(),) + self.rng = torch.Generator( + device=torch.cuda.current_device(), + ) self.use_cached_latents = self.cfg.use_cached_latents @@ -246,7 +248,10 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): # handle asynchronous grad reduction no_sync_func = None if not forward_only and self.with_distributed_adam: - no_sync_func = partial(self._optimizer.no_sync, greedy_grad_copy=self.megatron_amp_O2,) + no_sync_func = partial( + self._optimizer.no_sync, + greedy_grad_copy=self.megatron_amp_O2, + ) # pipeline schedules will get these from self.model.config for module in self.get_module_list(): @@ -291,12 +296,12 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): def training_step(self, dataloader_iter): """ - Our dataloaders produce a micro-batch and then we fetch - a number of microbatches depending on the global batch size and model parallel size - from the dataloader to produce a list of microbatches. - Batch should be a list of microbatches and those microbatches should on CPU. - Microbatches are then moved to GPU during the pipeline. - The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. + Our dataloaders produce a micro-batch and then we fetch + a number of microbatches depending on the global batch size and model parallel size + from the dataloader to produce a list of microbatches. + Batch should be a list of microbatches and those microbatches should on CPU. + Microbatches are then moved to GPU during the pipeline. + The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. """ # we zero grads here because we also call backward in the apex fwd/bwd functions @@ -351,20 +356,20 @@ def validation_step(self, dataloader_iter): return loss def backward(self, *args, **kwargs): - """ LightningModule hook to do backward. - We want this to do nothing since we run backward in the fwd/bwd functions from apex. - No need to call it here. + """LightningModule hook to do backward. + We want this to do nothing since we run backward in the fwd/bwd functions from apex. + No need to call it here. """ pass def optimizer_zero_grad(self, *args, **kwargs): - """ LightningModule hook to zero grad. - We want this to do nothing as we are zeroing grads during the training_step. + """LightningModule hook to zero grad. + We want this to do nothing as we are zeroing grads during the training_step. """ pass def _append_sequence_parallel_module_grads(self, module, grads): - """ Helper method for allreduce_sequence_parallel_gradients""" + """Helper method for allreduce_sequence_parallel_gradients""" for param in module.parameters(): sequence_parallel_param = getattr(param, 'sequence_parallel', False) @@ -381,7 +386,8 @@ def process_batch(batch): prompts, images = batch # DB has more dedicated structure for encoding, so we enable autocasting here as well with torch.cuda.amp.autocast( - self.autocast_dtype in (torch.half, torch.bfloat16), dtype=self.autocast_dtype, + self.autocast_dtype in (torch.half, torch.bfloat16), + dtype=self.autocast_dtype, ): images = images.cuda(non_blocking=True) @@ -412,7 +418,7 @@ def fwd_output_only_func(batch, model): return fwd_output_only_func def setup(self, stage=None): - """ PTL hook that is executed after DDP spawns. + """PTL hook that is executed after DDP spawns. We setup datasets here as megatron datasets require DDP to instantiate. See https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#setup for more information. Args: @@ -472,9 +478,9 @@ def setup_training_data(self, cfg): center_crop=cfg.center_crop, load_cache_latents=self.model.use_cached_latents, cached_instance_data_root=self.cfg.data.get("cached_instance_dir", None), - cached_reg_data_root=self.cfg.data.get("cached_reg_dir", None) - if self.cfg.with_prior_preservation - else None, + cached_reg_data_root=( + self.cfg.data.get("cached_reg_dir", None) if self.cfg.with_prior_preservation else None + ), vae=self.model.vae, text_encoder=self.model.text_encoder, ) @@ -505,16 +511,16 @@ def setup_test_data(self, cfg): pass def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_idx: int) -> Any: - """ PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device - When using pipeline parallelism, we need the global batch to remain on the CPU, - since the memory overhead will be too high when using a large number of microbatches. - Microbatches are transferred from CPU to GPU inside the pipeline. + """PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device + When using pipeline parallelism, we need the global batch to remain on the CPU, + since the memory overhead will be too high when using a large number of microbatches. + Microbatches are transferred from CPU to GPU inside the pipeline. """ return batch def _validate_trainer(self): - """ Certain trainer configurations can break training. - Here we try to catch them and raise an error. + """Certain trainer configurations can break training. + Here we try to catch them and raise an error. """ if self.trainer.accumulate_grad_batches > 1: raise ValueError( diff --git a/nemo/collections/multimodal/models/text_to_image/imagen/imagen.py b/nemo/collections/multimodal/models/text_to_image/imagen/imagen.py index 4fa6cd230e03..9dd52543f7bc 100644 --- a/nemo/collections/multimodal/models/text_to_image/imagen/imagen.py +++ b/nemo/collections/multimodal/models/text_to_image/imagen/imagen.py @@ -34,7 +34,6 @@ try: from apex import amp - from apex.transformer.pipeline_parallel.utils import get_num_microbatches HAVE_APEX = True except (ImportError, ModuleNotFoundError): @@ -42,6 +41,7 @@ try: from megatron.core import parallel_state + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func HAVE_MEGATRON_CORE = True @@ -218,8 +218,8 @@ def model_provider_func(self, pre_process=True, post_process=True): def get_forward_output_and_loss_func(self): def process_batch(batch): - """ Prepares the batch for megatron fwd/bwd functions. - Global batch is a list of micro batches. + """Prepares the batch for megatron fwd/bwd functions. + Global batch is a list of micro batches. """ # Base model and SR models have slightly different batch input: # Base model would only require images (64x64), @@ -323,7 +323,10 @@ def setup_test_data(self, cfg): f'Setting up test dataloader with len(len(self._test_ds)): {len(self._test_ds)} and consumed samples: {consumed_samples}' ) self._test_dl = torch.utils.data.DataLoader( - self._test_ds, batch_size=self._micro_batch_size, num_workers=cfg.num_workers, pin_memory=True, + self._test_ds, + batch_size=self._micro_batch_size, + num_workers=cfg.num_workers, + pin_memory=True, ) def fwd_bwd_step(self, dataloader_iter, forward_only): @@ -332,7 +335,10 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): # handle asynchronous grad reduction no_sync_func = None if not forward_only and self.with_distributed_adam: - no_sync_func = partial(self._optimizer.no_sync, greedy_grad_copy=self.megatron_amp_O2,) + no_sync_func = partial( + self._optimizer.no_sync, + greedy_grad_copy=self.megatron_amp_O2, + ) # pipeline schedules will get these from self.model.config for module in self.get_module_list(): @@ -379,12 +385,12 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): def training_step(self, dataloader_iter): """ - Our dataloaders produce a micro-batch and then we fetch - a number of microbatches depending on the global batch size and model parallel size - from the dataloader to produce a list of microbatches. - Batch should be a list of microbatches and those microbatches should on CPU. - Microbatches are then moved to GPU during the pipeline. - The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. + Our dataloaders produce a micro-batch and then we fetch + a number of microbatches depending on the global batch size and model parallel size + from the dataloader to produce a list of microbatches. + Batch should be a list of microbatches and those microbatches should on CPU. + Microbatches are then moved to GPU during the pipeline. + The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. """ # we zero grads here because we also call backward in the megatron-core fwd/bwd functions @@ -434,20 +440,20 @@ def training_step(self, dataloader_iter): return loss_mean def backward(self, *args, **kwargs): - """ LightningModule hook to do backward. - We want this to do nothing since we run backward in the fwd/bwd functions from apex. - No need to call it here. + """LightningModule hook to do backward. + We want this to do nothing since we run backward in the fwd/bwd functions from apex. + No need to call it here. """ pass def optimizer_zero_grad(self, *args, **kwargs): - """ LightningModule hook to zero grad. - We want this to do nothing as we are zeroing grads during the training_step. + """LightningModule hook to zero grad. + We want this to do nothing as we are zeroing grads during the training_step. """ pass def _append_sequence_parallel_module_grads(self, module, grads): - """ Helper method for allreduce_sequence_parallel_gradients""" + """Helper method for allreduce_sequence_parallel_gradients""" for param in module.parameters(): sequence_parallel_param = getattr(param, 'sequence_parallel', False) @@ -460,10 +466,10 @@ def _append_sequence_parallel_module_grads(self, module, grads): def validation_step(self, dataloader_iter): """ - Our dataloaders produce a micro-batch and then we fetch - a number of microbatches depending on the global batch size and model parallel size - from the dataloader to produce a list of microbatches. - The list of microbatches is then piped through the pipeline using megatron-core fwd/bwd functions. """ + Our dataloaders produce a micro-batch and then we fetch + a number of microbatches depending on the global batch size and model parallel size + from the dataloader to produce a list of microbatches. + The list of microbatches is then piped through the pipeline using megatron-core fwd/bwd functions.""" loss, val_loss_dict = self.fwd_bwd_step(dataloader_iter, True) @@ -471,7 +477,7 @@ def validation_step(self, dataloader_iter): return loss def setup(self, stage=None): - """ PTL hook that is executed after DDP spawns. + """PTL hook that is executed after DDP spawns. We setup datasets here as megatron datasets require DDP to instantiate. See https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#setup for more information. Args: @@ -520,16 +526,16 @@ def setup(self, stage=None): self.model.setup_rng() def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_idx: int) -> Any: - """ PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device - When using pipeline parallelism, we need the global batch to remain on the CPU, - since the memory overhead will be too high when using a large number of microbatches. - Microbatches are transferred from CPU to GPU inside the pipeline. + """PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device + When using pipeline parallelism, we need the global batch to remain on the CPU, + since the memory overhead will be too high when using a large number of microbatches. + Microbatches are transferred from CPU to GPU inside the pipeline. """ return batch def _validate_trainer(self): - """ Certain trainer configurations can break training. - Here we try to catch them and raise an error. + """Certain trainer configurations can break training. + Here we try to catch them and raise an error. """ if self.trainer.accumulate_grad_batches > 1: raise ValueError( @@ -558,7 +564,10 @@ def on_load_checkpoint(self, checkpoint) -> None: inductor_enabled = self.cfg.get('inductor', False) state_dict = checkpoint['state_dict'] inductor_checkpoint = False - for k, v, in state_dict.items(): + for ( + k, + v, + ) in state_dict.items(): if '_orig_mod' in k: inductor_checkpoint = True break diff --git a/nemo/collections/multimodal/models/text_to_image/stable_diffusion/diffusion_engine.py b/nemo/collections/multimodal/models/text_to_image/stable_diffusion/diffusion_engine.py index 755588202ef0..f099c9d41837 100644 --- a/nemo/collections/multimodal/models/text_to_image/stable_diffusion/diffusion_engine.py +++ b/nemo/collections/multimodal/models/text_to_image/stable_diffusion/diffusion_engine.py @@ -55,7 +55,6 @@ try: from apex import amp from apex.transformer.enums import AttnMaskType - from apex.transformer.pipeline_parallel.utils import get_num_microbatches HAVE_APEX = True except (ImportError, ModuleNotFoundError): @@ -63,6 +62,7 @@ try: from megatron.core import parallel_state + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func HAVE_MEGATRON_CORE = True diff --git a/nemo/collections/multimodal/models/text_to_image/stable_diffusion/ldm/ddpm.py b/nemo/collections/multimodal/models/text_to_image/stable_diffusion/ldm/ddpm.py index 6ea4314ab71f..17599d4b0932 100644 --- a/nemo/collections/multimodal/models/text_to_image/stable_diffusion/ldm/ddpm.py +++ b/nemo/collections/multimodal/models/text_to_image/stable_diffusion/ldm/ddpm.py @@ -78,7 +78,6 @@ try: from apex import amp from apex.transformer.enums import AttnMaskType - from apex.transformer.pipeline_parallel.utils import get_num_microbatches HAVE_APEX = True except (ImportError, ModuleNotFoundError): @@ -86,6 +85,7 @@ try: from megatron.core import parallel_state + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func HAVE_MEGATRON_CORE = True @@ -163,7 +163,9 @@ def __init__(self, cfg): cuda_graph_enabled = cfg.get("capture_cudagraph_iters", -1) >= 0 if not cuda_graph_enabled: logging.info("Use custom random generator") - self.rng = torch.Generator(device=torch.cuda.current_device(),) + self.rng = torch.Generator( + device=torch.cuda.current_device(), + ) else: logging.info("Use system random generator since CUDA graph enabled") self.rng = None @@ -222,14 +224,12 @@ def register_schedule( ) if self.parameterization == "eps": - lvlb_weights = self.betas ** 2 / ( - 2 * self.posterior_variance * to_torch(alphas) * (1 - self.alphas_cumprod) - ) + lvlb_weights = self.betas**2 / (2 * self.posterior_variance * to_torch(alphas) * (1 - self.alphas_cumprod)) elif self.parameterization == "x0": lvlb_weights = 0.5 * np.sqrt(torch.Tensor(alphas_cumprod)) / (2.0 * 1 - torch.Tensor(alphas_cumprod)) elif self.parameterization == "v": lvlb_weights = torch.ones_like( - self.betas ** 2 / (2 * self.posterior_variance * to_torch(alphas) * (1 - self.alphas_cumprod)) + self.betas**2 / (2 * self.posterior_variance * to_torch(alphas) * (1 - self.alphas_cumprod)) ) else: raise NotImplementedError("mu not supported") @@ -239,7 +239,13 @@ def register_schedule( assert not torch.isnan(self.lvlb_weights).all() def init_from_ckpt( - self, path, ignore_keys=list(), only_model=False, load_vae=True, load_unet=True, load_encoder=True, + self, + path, + ignore_keys=list(), + only_model=False, + load_vae=True, + load_unet=True, + load_encoder=True, ): pl_sd = torch.load(path, map_location="cpu") if "state_dict" in list(pl_sd.keys()): @@ -561,7 +567,11 @@ def __init__(self, cfg, model_parallel_config): load_encoder = True if cfg.get("load_encoder", None) is None else cfg.load_encoder self.init_from_ckpt( - ckpt_path, ignore_keys, load_vae=load_vae, load_unet=load_unet, load_encoder=load_encoder, + ckpt_path, + ignore_keys, + load_vae=load_vae, + load_unet=load_unet, + load_encoder=load_encoder, ) self.restarted_from_ckpt = True @@ -569,7 +579,9 @@ def __init__(self, cfg, model_parallel_config): self.first_stage_model = self.first_stage_model.to(memory_format=torch.channels_last) self.model = self.model.to(memory_format=torch.channels_last) - def make_cond_schedule(self,): + def make_cond_schedule( + self, + ): self.cond_ids = torch.full(size=(self.num_timesteps,), fill_value=self.num_timesteps - 1, dtype=torch.long) ids = torch.round(torch.linspace(0, self.num_timesteps - 1, self.num_timesteps_cond)).long() self.cond_ids[: self.num_timesteps_cond] = ids @@ -686,7 +698,9 @@ def delta_border(self, h, w): def get_weighting(self, h, w, Ly, Lx, device): weighting = self.delta_border(h, w) weighting = torch.clip( - weighting, self.split_input_params["clip_min_weight"], self.split_input_params["clip_max_weight"], + weighting, + self.split_input_params["clip_min_weight"], + self.split_input_params["clip_max_weight"], ) weighting = weighting.view(1, h * w, 1).repeat(1, 1, Ly * Lx).to(device) @@ -1322,9 +1336,11 @@ def progressive_denoising( if cond is not None: if isinstance(cond, dict): cond = { - key: cond[key][:batch_size] - if not isinstance(cond[key], list) - else list(map(lambda x: x[:batch_size], cond[key])) + key: ( + cond[key][:batch_size] + if not isinstance(cond[key], list) + else list(map(lambda x: x[:batch_size], cond[key])) + ) for key in cond } else: @@ -1458,9 +1474,11 @@ def sample( if cond is not None: if isinstance(cond, dict): cond = { - key: cond[key][:batch_size] - if not isinstance(cond[key], list) - else list(map(lambda x: x[:batch_size], cond[key])) + key: ( + cond[key][:batch_size] + if not isinstance(cond[key], list) + else list(map(lambda x: x[:batch_size], cond[key])) + ) for key in cond } else: @@ -1731,7 +1749,10 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): # handle asynchronous grad reduction no_sync_func = None if not forward_only and self.with_distributed_adam: - no_sync_func = partial(self._optimizer.no_sync, greedy_grad_copy=self.megatron_amp_O2,) + no_sync_func = partial( + self._optimizer.no_sync, + greedy_grad_copy=self.megatron_amp_O2, + ) # pipeline schedules will get these from self.model.config for module in self.get_module_list(): @@ -1779,29 +1800,31 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): if self.loss_broadcast_src_rank is None: self.loss_broadcast_src_rank = parallel_state.get_pipeline_model_parallel_last_rank() torch.distributed.broadcast( - loss_mean, self.loss_broadcast_src_rank, group=parallel_state.get_pipeline_model_parallel_group(), + loss_mean, + self.loss_broadcast_src_rank, + group=parallel_state.get_pipeline_model_parallel_group(), ) return loss_mean, loss_dict def training_step(self, batch): """ - Notice: `training_step` used to have the following signature to support pipeline - parallelism: - - def training_step(self, dataloader_iter, batch_idx): - - However, full iteration CUDA Graph callback is not compatible with this signature - right now, due to we need to wrap the dataloader to generate static tensor outside - the CUDA Graph. This signature moves `next(dataloader)` into the CUDA Graph - capturing region, thus we disabled it. - - Our dataloaders produce a micro-batch and then we fetch - a number of microbatches depending on the global batch size and model parallel size - from the dataloader to produce a list of microbatches. - Batch should be a list of microbatches and those microbatches should on CPU. - Microbatches are then moved to GPU during the pipeline. - The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. + Notice: `training_step` used to have the following signature to support pipeline + parallelism: + + def training_step(self, dataloader_iter, batch_idx): + + However, full iteration CUDA Graph callback is not compatible with this signature + right now, due to we need to wrap the dataloader to generate static tensor outside + the CUDA Graph. This signature moves `next(dataloader)` into the CUDA Graph + capturing region, thus we disabled it. + + Our dataloaders produce a micro-batch and then we fetch + a number of microbatches depending on the global batch size and model parallel size + from the dataloader to produce a list of microbatches. + Batch should be a list of microbatches and those microbatches should on CPU. + Microbatches are then moved to GPU during the pipeline. + The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. """ # we zero grads here because we also call backward in the megatron-core fwd/bwd functions @@ -1875,20 +1898,20 @@ def non_cuda_graph_capturable(self): self.log("timestamp", ts, batch_size=1, rank_zero_only=True) def backward(self, *args, **kwargs): - """ LightningModule hook to do backward. - We want this to do nothing since we run backward in the fwd/bwd functions from apex. - No need to call it here. + """LightningModule hook to do backward. + We want this to do nothing since we run backward in the fwd/bwd functions from apex. + No need to call it here. """ pass def optimizer_zero_grad(self, *args, **kwargs): - """ LightningModule hook to zero grad. - We want this to do nothing as we are zeroing grads during the training_step. + """LightningModule hook to zero grad. + We want this to do nothing as we are zeroing grads during the training_step. """ pass def _append_sequence_parallel_module_grads(self, module, grads): - """ Helper method for allreduce_sequence_parallel_gradients""" + """Helper method for allreduce_sequence_parallel_gradients""" for param in module.parameters(): sequence_parallel_param = getattr(param, 'sequence_parallel', False) @@ -1901,8 +1924,8 @@ def _append_sequence_parallel_module_grads(self, module, grads): def get_forward_output_and_loss_func(self): def process_batch(batch): - """ Prepares the global batch for apex fwd/bwd functions. - Global batch is a list of micro batches. + """Prepares the global batch for apex fwd/bwd functions. + Global batch is a list of micro batches. """ # noise_map, condition batch[self.cfg.first_stage_key] = batch[self.cfg.first_stage_key].cuda(non_blocking=True) @@ -1912,7 +1935,8 @@ def process_batch(batch): # SD has more dedicated structure for encoding, so we enable autocasting here as well with torch.cuda.amp.autocast( - self.autocast_dtype in (torch.half, torch.bfloat16), dtype=self.autocast_dtype, + self.autocast_dtype in (torch.half, torch.bfloat16), + dtype=self.autocast_dtype, ): x, c = self.model.get_input(batch, self.cfg.first_stage_key) @@ -1959,7 +1983,7 @@ def validation_step(self, dataloader_iter): return loss def setup(self, stage=None): - """ PTL hook that is executed after DDP spawns. + """PTL hook that is executed after DDP spawns. We setup datasets here as megatron datasets require DDP to instantiate. See https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#setup for more information. Args: @@ -2016,11 +2040,13 @@ def build_train_valid_test_datasets(self): if self.cfg.first_stage_key.endswith("encoded") or self.cfg.first_stage_key.endswith("moments"): if self.cfg.cond_stage_key.endswith("clip_encoded"): self._train_ds, self._validation_ds = build_train_valid_precached_clip_datasets( - model_cfg=self.cfg, consumed_samples=self.compute_consumed_samples(0), + model_cfg=self.cfg, + consumed_samples=self.compute_consumed_samples(0), ) else: self._train_ds, self._validation_ds = build_train_valid_precached_datasets( - model_cfg=self.cfg, consumed_samples=self.compute_consumed_samples(0), + model_cfg=self.cfg, + consumed_samples=self.compute_consumed_samples(0), ) else: self._train_ds, self._validation_ds = build_train_valid_datasets( @@ -2045,7 +2071,8 @@ def setup_training_data(self, cfg): ) if self.cfg.cond_stage_key.endswith("clip_encoded"): collate_fn = get_collate_fn( - first_stage_key=self.cfg.first_stage_key, cond_stage_key=self.cfg.cond_stage_key, + first_stage_key=self.cfg.first_stage_key, + cond_stage_key=self.cfg.cond_stage_key, ) else: collate_fn = None @@ -2082,20 +2109,23 @@ def setup_test_data(self, cfg): f'Setting up test dataloader with len(len(self._test_ds)): {len(self._test_ds)} and consumed samples: {consumed_samples}' ) self._test_dl = torch.utils.data.DataLoader( - self._test_ds, batch_size=self._micro_batch_size, num_workers=cfg.num_workers, pin_memory=True, + self._test_ds, + batch_size=self._micro_batch_size, + num_workers=cfg.num_workers, + pin_memory=True, ) def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_idx: int) -> Any: - """ PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device - When using pipeline parallelism, we need the global batch to remain on the CPU, - since the memory overhead will be too high when using a large number of microbatches. - Microbatches are transferred from CPU to GPU inside the pipeline. + """PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device + When using pipeline parallelism, we need the global batch to remain on the CPU, + since the memory overhead will be too high when using a large number of microbatches. + Microbatches are transferred from CPU to GPU inside the pipeline. """ return batch def _validate_trainer(self): - """ Certain trainer configurations can break training. - Here we try to catch them and raise an error. + """Certain trainer configurations can break training. + Here we try to catch them and raise an error. """ if self.trainer.accumulate_grad_batches > 1: raise ValueError( @@ -2253,23 +2283,26 @@ def _check_and_add_adapter(self, name, module, peft_name, peft_cfg, name_key_to_ ) def load_adapters( - self, filepath: str, peft_cfgs: Optional[Union[PEFTConfig, List[PEFTConfig]]] = None, map_location: str = None, + self, + filepath: str, + peft_cfgs: Optional[Union[PEFTConfig, List[PEFTConfig]]] = None, + map_location: str = None, ): """ - Utility method that restores only the adapter module(s), and not the entire model itself. - This allows the sharing of adapters which are often just a fraction of the size of the full model, - enabling easier deliver. + Utility method that restores only the adapter module(s), and not the entire model itself. + This allows the sharing of adapters which are often just a fraction of the size of the full model, + enabling easier deliver. - .. note:: + .. note:: - During restoration, assumes that the model does not currently already have one or more adapter modules. + During restoration, assumes that the model does not currently already have one or more adapter modules. - Args: - filepath: Filepath of the .ckpt or .nemo file. - peft_cfgs: One or more PEFTConfig objects that specify the PEFT method configuration. - If none, will infer from the .nemo checkpoint - map_location: Pytorch flag, where to place the adapter(s) state dict(s). - """ + Args: + filepath: Filepath of the .ckpt or .nemo file. + peft_cfgs: One or more PEFTConfig objects that specify the PEFT method configuration. + If none, will infer from the .nemo checkpoint + map_location: Pytorch flag, where to place the adapter(s) state dict(s). + """ def _modify_state_dict(state_dict): # Modify state key for Dreambooth inference @@ -2310,7 +2343,11 @@ def _modify_state_dict(state_dict): class DiffusionWrapper(pl.LightningModule, Serialization): def __init__( - self, diff_model_config, conditioning_key, inductor: bool = False, inductor_cudagraphs: bool = False, + self, + diff_model_config, + conditioning_key, + inductor: bool = False, + inductor_cudagraphs: bool = False, ): super().__init__() self.diffusion_model = DiffusionWrapper.from_config_dict(diff_model_config) diff --git a/nemo/collections/multimodal/models/vision_language_foundation/clip/megatron_clip_models.py b/nemo/collections/multimodal/models/vision_language_foundation/clip/megatron_clip_models.py index a83960307672..d811ce94dbea 100644 --- a/nemo/collections/multimodal/models/vision_language_foundation/clip/megatron_clip_models.py +++ b/nemo/collections/multimodal/models/vision_language_foundation/clip/megatron_clip_models.py @@ -54,7 +54,6 @@ try: from apex.transformer.enums import AttnMaskType - from apex.transformer.pipeline_parallel.utils import get_num_microbatches HAVE_APEX = True except (ImportError, ModuleNotFoundError): @@ -67,6 +66,7 @@ from megatron.core.fusions.fused_bias_dropout import get_bias_dropout_add from megatron.core.models.gpt import GPTModel as MCoreGPTModel from megatron.core.models.vision.clip_vit_model import CLIPViTModel + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func from megatron.core.transformer.attention import CrossAttention, CrossAttentionSubmodules from megatron.core.transformer.custom_layers.transformer_engine import ( diff --git a/nemo/collections/multimodal/models/vision_language_foundation/megatron_nsfw_clip_models.py b/nemo/collections/multimodal/models/vision_language_foundation/megatron_nsfw_clip_models.py index 24c2bfc58be7..7b127335d336 100644 --- a/nemo/collections/multimodal/models/vision_language_foundation/megatron_nsfw_clip_models.py +++ b/nemo/collections/multimodal/models/vision_language_foundation/megatron_nsfw_clip_models.py @@ -19,8 +19,8 @@ import torch import torch.nn as nn import torch.nn.functional as F -from apex.transformer.pipeline_parallel.utils import get_num_microbatches from megatron.core import parallel_state +from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func from omegaconf.dictconfig import DictConfig from pytorch_lightning.accelerators import CPUAccelerator diff --git a/nemo/collections/multimodal/speech_llm/models/modular_models.py b/nemo/collections/multimodal/speech_llm/models/modular_models.py index cce74e7b6a1d..aa21cf95bfa4 100644 --- a/nemo/collections/multimodal/speech_llm/models/modular_models.py +++ b/nemo/collections/multimodal/speech_llm/models/modular_models.py @@ -55,18 +55,13 @@ from nemo.core.classes.common import PretrainedModelInfo from nemo.core.classes.mixins import adapter_mixins from nemo.utils import AppState, logging, model_utils +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator from nemo.utils.model_utils import inject_model_parallel_rank -try: - from apex.transformer.pipeline_parallel.utils import _reconfigure_microbatch_calculator, get_num_microbatches - - HAVE_APEX = True -except (ImportError, ModuleNotFoundError): - HAVE_APEX = False - try: from megatron.core import InferenceParams, parallel_state, tensor_parallel from megatron.core.models.gpt import GPTModel as MCoreGPTModel + from megatron.core.num_microbatches_calculator import get_num_microbatches HAVE_MEGATRON_CORE = True diff --git a/nemo/collections/multimodal/speech_llm/models/modular_t5_models.py b/nemo/collections/multimodal/speech_llm/models/modular_t5_models.py index a96ee823e197..e9dacca17bc4 100644 --- a/nemo/collections/multimodal/speech_llm/models/modular_t5_models.py +++ b/nemo/collections/multimodal/speech_llm/models/modular_t5_models.py @@ -49,22 +49,11 @@ from nemo.collections.nlp.parts.utils_funcs import get_last_rank from nemo.core.classes.mixins import adapter_mixins from nemo.utils import AppState, logging, model_utils - -try: - from apex.transformer.pipeline_parallel.utils import ( - _reconfigure_microbatch_calculator, - get_current_global_batch_size, - get_micro_batch_size, - get_num_microbatches, - ) - - HAVE_APEX = True -except (ImportError, ModuleNotFoundError): - HAVE_APEX = False -from nemo.collections.nlp.models.language_modeling.megatron_t5_model import MegatronT5Model +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator, get_micro_batch_size try: from megatron.core import parallel_state, tensor_parallel + from megatron.core.num_microbatches_calculator import get_current_global_batch_size, get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func HAVE_MEGATRON_CORE = True diff --git a/nemo/collections/multimodal/speech_llm/modules/common/audio_text_generation_utils.py b/nemo/collections/multimodal/speech_llm/modules/common/audio_text_generation_utils.py index 136418031586..bb183d45ea2d 100644 --- a/nemo/collections/multimodal/speech_llm/modules/common/audio_text_generation_utils.py +++ b/nemo/collections/multimodal/speech_llm/modules/common/audio_text_generation_utils.py @@ -28,15 +28,7 @@ ) from nemo.collections.nlp.modules.common.transformer.text_generation import OutputType from nemo.utils import AppState - -try: - from apex.transformer.pipeline_parallel.utils import _reconfigure_microbatch_calculator - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - - HAVE_APEX = False +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator try: from megatron.core import parallel_state, tensor_parallel diff --git a/nemo/collections/nlp/data/language_modeling/megatron/retro_dataset.py b/nemo/collections/nlp/data/language_modeling/megatron/retro_dataset.py index 7d604c0b51bc..d8dafd69c658 100644 --- a/nemo/collections/nlp/data/language_modeling/megatron/retro_dataset.py +++ b/nemo/collections/nlp/data/language_modeling/megatron/retro_dataset.py @@ -45,11 +45,14 @@ from nemo.collections.nlp.modules.common.megatron.utils import get_ltor_masks_and_position_ids - HAVE_MEGATRON_CORE = True + HAVE_TE_AND_MEGATRON_CORE = True except (ImportError, ModuleNotFoundError): - HAVE_MEGATRON_CORE = False + HAVE_TE_AND_MEGATRON_CORE = False + from typing import Any + + RetroConfig = Any class RETRODataset(Dataset): @@ -129,57 +132,64 @@ def build_train_valid_test_datasets( tokenizer, ): - # gpt dataset - train_ds, valid_ds, test_ds = gpt_train_valid_test_datasets_provider(cfg, train_valid_test_num_samples, tokenizer) + if HAVE_TE_AND_MEGATRON_CORE: - gpt_datasets = { - "train": (train_ds, train_valid_test_num_samples[0]), - "valid": (valid_ds, train_valid_test_num_samples[1]), - "test": (test_ds, train_valid_test_num_samples[2]), - } + # gpt dataset + train_ds, valid_ds, test_ds = gpt_train_valid_test_datasets_provider( + cfg, train_valid_test_num_samples, tokenizer + ) - retro_train_ds, retro_valid_ds, retro_test_ds = get_retro_datasets( - config=retro_config, - gpt_datasets=gpt_datasets, - sample_length=seq_length, - eod_token_id=tokenizer.eos_id, - ) + gpt_datasets = { + "train": (train_ds, train_valid_test_num_samples[0]), + "valid": (valid_ds, train_valid_test_num_samples[1]), + "test": (test_ds, train_valid_test_num_samples[2]), + } - train_ds = ( - RETRODataset( - cfg=cfg, - retro_config=retro_config, - tokenizer=tokenizer, - mcore_retro_dataset=retro_train_ds, - number_samples_with_neighbors=train_valid_test_num_samples[0], + retro_train_ds, retro_valid_ds, retro_test_ds = get_retro_datasets( + config=retro_config, + gpt_datasets=gpt_datasets, + sample_length=seq_length, + eod_token_id=tokenizer.eos_id, ) - if retro_train_ds - else None - ) - valid_ds = ( - RETRODataset( - cfg=cfg, - retro_config=retro_config, - tokenizer=tokenizer, - mcore_retro_dataset=retro_valid_ds, - number_samples_with_neighbors=train_valid_test_num_samples[1], + + train_ds = ( + RETRODataset( + cfg=cfg, + retro_config=retro_config, + tokenizer=tokenizer, + mcore_retro_dataset=retro_train_ds, + number_samples_with_neighbors=train_valid_test_num_samples[0], + ) + if retro_train_ds + else None ) - if retro_valid_ds - else None - ) - test_ds = ( - RETRODataset( - cfg=cfg, - retro_config=retro_config, - tokenizer=tokenizer, - mcore_retro_dataset=retro_test_ds, - number_samples_with_neighbors=train_valid_test_num_samples[2], + valid_ds = ( + RETRODataset( + cfg=cfg, + retro_config=retro_config, + tokenizer=tokenizer, + mcore_retro_dataset=retro_valid_ds, + number_samples_with_neighbors=train_valid_test_num_samples[1], + ) + if retro_valid_ds + else None + ) + test_ds = ( + RETRODataset( + cfg=cfg, + retro_config=retro_config, + tokenizer=tokenizer, + mcore_retro_dataset=retro_test_ds, + number_samples_with_neighbors=train_valid_test_num_samples[2], + ) + if retro_test_ds + else None ) - if retro_test_ds - else None - ) - return train_ds, valid_ds, test_ds + return train_ds, valid_ds, test_ds + else: + logging.warn('Megatron core is not installed. Returning None') + return def gpt_train_valid_test_datasets_provider(cfg, train_val_test_num_samples, tokenizer): diff --git a/nemo/collections/nlp/models/dialogue/dialogue_s2s_generation_model.py b/nemo/collections/nlp/models/dialogue/dialogue_s2s_generation_model.py index 73f09f62b1d5..f3b77493e0df 100644 --- a/nemo/collections/nlp/models/dialogue/dialogue_s2s_generation_model.py +++ b/nemo/collections/nlp/models/dialogue/dialogue_s2s_generation_model.py @@ -32,16 +32,9 @@ from nemo.collections.nlp.models.nlp_model import NLPModel from nemo.core.classes.common import PretrainedModelInfo from nemo.utils import logging +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator from nemo.utils.decorators import deprecated_warning -try: - from apex.transformer.pipeline_parallel.utils import _reconfigure_microbatch_calculator - - HAVE_APEX = True -except: - HAVE_APEX = False - - __all__ = ['DialogueS2SGenerationModel'] diff --git a/nemo/collections/nlp/models/information_retrieval/megatron_bert_embedding_model.py b/nemo/collections/nlp/models/information_retrieval/megatron_bert_embedding_model.py index 102ab5ec0f84..485c49cbd927 100644 --- a/nemo/collections/nlp/models/information_retrieval/megatron_bert_embedding_model.py +++ b/nemo/collections/nlp/models/information_retrieval/megatron_bert_embedding_model.py @@ -14,17 +14,9 @@ import logging -try: - from apex.transformer.pipeline_parallel.utils import get_num_microbatches - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - - HAVE_APEX = False - try: from megatron.core import parallel_state + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func from megatron.core.transformer.module import Float16Module as MCoreFloat16Module @@ -435,7 +427,10 @@ def training_step(self, dataloader_iter): self.log('lr', lr, batch_size=1) self.log('global_step', self.trainer.global_step, prog_bar=True, batch_size=1) self.log( - 'consumed_samples', self._compute_consumed_samples_after_training_step(), prog_bar=True, batch_size=1, + 'consumed_samples', + self._compute_consumed_samples_after_training_step(), + prog_bar=True, + batch_size=1, ) return loss_mean[0] @@ -488,7 +483,12 @@ def loss_func(self, output_tensor): ] # List of length "num_negatives", each tensor of shape (bs, embedding_dim) hard_negs_scores = ( - torch.multiply(queries.unsqueeze(0).repeat(len(hard_negs), 1, 1), torch.stack(hard_negs),).sum(axis=-1).T + torch.multiply( + queries.unsqueeze(0).repeat(len(hard_negs), 1, 1), + torch.stack(hard_negs), + ) + .sum(axis=-1) + .T ) # shape = (bs, num_negatives); Hard negatives are not shared between queries. scores = torch.cat([pos_inbatch_negs_scores, hard_negs_scores], axis=1) diff --git a/nemo/collections/nlp/models/language_modeling/megatron/gpt_layer_modelopt_spec.py b/nemo/collections/nlp/models/language_modeling/megatron/gpt_layer_modelopt_spec.py index f001e8f58d25..e7d9ff8aacab 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron/gpt_layer_modelopt_spec.py +++ b/nemo/collections/nlp/models/language_modeling/megatron/gpt_layer_modelopt_spec.py @@ -31,6 +31,8 @@ except (ImportError, ModuleNotFoundError) as e: + from nemo.collections.nlp.modules.common.megatron.utils import ApexGuardDefaults + TransformerLayer = TransformerLayerSubmodules = ApexGuardDefaults MLP = MLPSubmodules = ModuleSpec = IdentityOp = ApexGuardDefaults AttnMaskType = DotProductAttention = TENorm = ApexGuardDefaults diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py index 6156cd719289..20d532d4764a 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_model.py @@ -47,19 +47,10 @@ from nemo.utils import AppState, logging, str_to_dtype from nemo.utils.get_rank import is_global_rank_zero -try: - from apex.transformer.pipeline_parallel.utils import get_num_microbatches - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - - HAVE_APEX = False - - try: from megatron.core import ModelParallelConfig, parallel_state from megatron.core.distributed import DistributedDataParallel as McoreDDP + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.transformer.module import Float16Module as MCoreFloat16Module from megatron.core.transformer.transformer_config import TransformerConfig from megatron.core.utils import init_method_normal, scaled_init_method_normal @@ -917,7 +908,7 @@ def compute_consumed_samples(self, steps_since_resume=0): app_state = AppState() if self.cfg.get('rampup_batch_size', None): - from apex.transformer.pipeline_parallel.utils import _GLOBAL_NUM_MICROBATCHES_CALCULATOR + from megatron.core.num_microbatches_calculator import _GLOBAL_NUM_MICROBATCHES_CALCULATOR current_global_batch_size = getattr(_GLOBAL_NUM_MICROBATCHES_CALCULATOR, 'current_global_batch_size', 1) consumed_samples = self.prev_consumed_samples + self.if_first_step * current_global_batch_size diff --git a/nemo/collections/nlp/models/language_modeling/megatron_base_prompt_learning_model.py b/nemo/collections/nlp/models/language_modeling/megatron_base_prompt_learning_model.py index f6ee4b20183c..0e03e8994dc2 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_base_prompt_learning_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_base_prompt_learning_model.py @@ -37,16 +37,9 @@ from nemo.collections.nlp.modules.common.transformer.text_generation import TextGeneration from nemo.collections.nlp.parts.nlp_overrides import GradScaler from nemo.utils import AppState, logging +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator from nemo.utils.decorators import deprecated_warning -try: - from apex.transformer.pipeline_parallel.utils import _reconfigure_microbatch_calculator - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - HAVE_APEX = False - try: from megatron.core import ModelParallelConfig, parallel_state diff --git a/nemo/collections/nlp/models/language_modeling/megatron_bert_model.py b/nemo/collections/nlp/models/language_modeling/megatron_bert_model.py index 984fca5f1259..701d24d5b942 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_bert_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_bert_model.py @@ -46,15 +46,6 @@ from nemo.core.neural_types import ChannelType, MaskType, NeuralType from nemo.utils import logging -try: - from apex.transformer.pipeline_parallel.utils import get_num_microbatches - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - - HAVE_APEX = False - try: import logging @@ -67,6 +58,7 @@ try: from megatron.core import parallel_state from megatron.core.models.bert.bert_layer_specs import bert_layer_with_transformer_engine_spec + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func from megatron.core.transformer.module import Float16Module as MCoreFloat16Module from megatron.core.transformer.transformer_config import TransformerConfig @@ -214,8 +206,8 @@ def model_provider_func(self, pre_process, post_process): return model def _validate_trainer(self): - """ Certain trainer configurations can break training. - Here we try to catch them and raise an error. + """Certain trainer configurations can break training. + Here we try to catch them and raise an error. """ if self.trainer.accumulate_grad_batches > 1: raise ValueError( @@ -308,7 +300,11 @@ def forward( model = self.model if self.mcore_bert: - output_tensor = model(input_ids, attention_mask, tokentype_ids=token_type_ids,) + output_tensor = model( + input_ids, + attention_mask, + tokentype_ids=token_type_ids, + ) else: output_tensor = model( input_ids, @@ -423,21 +419,24 @@ def training_step(self, dataloader_iter): self.log('lr', lr, batch_size=1) self.log('global_step', self.trainer.global_step, prog_bar=True, batch_size=1) self.log( - 'consumed_samples', self._compute_consumed_samples_after_training_step(), prog_bar=True, batch_size=1, + 'consumed_samples', + self._compute_consumed_samples_after_training_step(), + prog_bar=True, + batch_size=1, ) return loss_mean[0] def _make_data_iterator_list(self, data_iterator: Iterator) -> List[Iterator]: - """ Convert data iterator into form expected by Megatron - With interleaved pipeline parallelism, Megatron expects a - list of one data iterator per model chunk. Each model - chunk independently gets data from its data iterator, so - we need to interact with the data iterator multiple times - for each microbatch step. Instead of incorporating this - logic into the data loader, we cache the iterator's output - to the first model chunk and reuse it in the other model - chunks. + """Convert data iterator into form expected by Megatron + With interleaved pipeline parallelism, Megatron expects a + list of one data iterator per model chunk. Each model + chunk independently gets data from its data iterator, so + we need to interact with the data iterator multiple times + for each microbatch step. Instead of incorporating this + logic into the data loader, we cache the iterator's output + to the first model chunk and reuse it in the other model + chunks. """ if not isinstance(self.model, list) or len(self.model) == 1: @@ -703,9 +702,9 @@ def build_train_valid_test_datasets(self): ] if self.trainer.limit_val_batches <= 1.0 and isinstance(self.trainer.limit_val_batches, float): - train_valid_test_num_samples[ - 1 - ] = 1 # This is to make sure we only have one epoch on every validation iteration + train_valid_test_num_samples[1] = ( + 1 # This is to make sure we only have one epoch on every validation iteration + ) self._train_ds, self._validation_ds, self._test_ds = dataset_utils.build_train_valid_test_datasets( cfg=self.cfg, @@ -739,20 +738,20 @@ def build_train_valid_test_datasets(self): return self._train_ds, self._validation_ds, self._test_ds def backward(self, *args, **kwargs): - """ LightningModule hook to do backward. - We want this to do nothing since we run backward in the fwd/bwd functions from megatron-core. - No need to call it here. + """LightningModule hook to do backward. + We want this to do nothing since we run backward in the fwd/bwd functions from megatron-core. + No need to call it here. """ return def optimizer_zero_grad(self, *args, **kwargs): - """ LightningModule hook to zero grad. - We want this to do nothing as we are zeroing grads during the training_step. + """LightningModule hook to zero grad. + We want this to do nothing as we are zeroing grads during the training_step. """ return def _append_sequence_parallel_module_grads(self, module, grads): - """ Helper method for allreduce_sequence_parallel_gradients""" + """Helper method for allreduce_sequence_parallel_gradients""" for param in module.parameters(): sequence_parallel_param = getattr(param, 'sequence_parallel', False) @@ -822,12 +821,12 @@ def setup(self, stage=None): self.setup_transformer_engine_tp_groups() def setup_transformer_engine_tp_groups(self): - """ This should be called after model parallel groups have been initialized - and only needs to be called when using Transformer Engine. + """This should be called after model parallel groups have been initialized + and only needs to be called when using Transformer Engine. """ for module in self.get_bert_module_list(): """Set TP group - Copied from: https://github.com/NVIDIA/TransformerEngine/blob/main/transformer_engine/pytorch/transformer.py#L398 + Copied from: https://github.com/NVIDIA/TransformerEngine/blob/main/transformer_engine/pytorch/transformer.py#L398 """ # Deep iterate but skip self to avoid infinite recursion. for index, child in enumerate(module.modules()): @@ -849,9 +848,9 @@ def get_bert_module_list(self): return [self.model] def allreduce_sequence_parallel_gradients(self): - """ All-reduce layernorm parameters across model parallel nodes when sequence parallelism is used. - Modified from megatron-lm: - https://gitlab-master.nvidia.com/ADLR/megatron-lm/-/blob/3f91f09bb2ab32f9904b47f46f19d2fc3f518ed8/megatron/training.py#L425 + """All-reduce layernorm parameters across model parallel nodes when sequence parallelism is used. + Modified from megatron-lm: + https://gitlab-master.nvidia.com/ADLR/megatron-lm/-/blob/3f91f09bb2ab32f9904b47f46f19d2fc3f518ed8/megatron/training.py#L425 """ grads = [] @@ -931,10 +930,10 @@ def setup_test_data(self, cfg): self._test_dl = self.build_pretraining_data_loader(self._test_ds, consumed_samples) def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_idx: int) -> Any: - """ PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device - When using pipeline parallelism, we need the global batch to remain on the CPU, - since the memory overhead will be too high when using a large number of microbatches. - Microbatches are transferred from CPU to GPU inside the pipeline. + """PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device + When using pipeline parallelism, we need the global batch to remain on the CPU, + since the memory overhead will be too high when using a large number of microbatches. + Microbatches are transferred from CPU to GPU inside the pipeline. """ return batch @@ -1154,10 +1153,10 @@ def on_load_checkpoint(self, checkpoint) -> None: parallel_state.set_virtual_pipeline_model_parallel_rank(0) def build_transformer_config(self) -> TransformerConfig: - """ Builds the megatron core gpt transformer config for the model. - For attributes in the nemo model config that are the same - as the megatron core TransformerConfig, we will use the value from the nemo model config. - For attributes in TransformerConfig that are not in the nemo model config, we add custom logic. + """Builds the megatron core gpt transformer config for the model. + For attributes in the nemo model config that are the same + as the megatron core TransformerConfig, we will use the value from the nemo model config. + For attributes in TransformerConfig that are not in the nemo model config, we add custom logic. """ activation = self.cfg.get('activation', 'gelu') assert activation == 'gelu', "Only gelu activation is support for BERT at the moment." diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py index e4cab6cec26f..997235e639d2 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py @@ -45,7 +45,6 @@ from nemo.collections.nlp.models.language_modeling.megatron.gpt_layer_modelopt_spec import get_gpt_layer_modelopt_spec from nemo.collections.nlp.models.language_modeling.megatron.gpt_model import GPTModel from nemo.collections.nlp.models.language_modeling.megatron_base_model import MegatronBaseModel -from nemo.collections.nlp.modules.common.hyena.hyena_spec import get_gpt_layer_with_te_and_hyena_spec from nemo.collections.nlp.modules.common.megatron.build_model import build_model from nemo.collections.nlp.modules.common.megatron.module import Float16Module from nemo.collections.nlp.modules.common.megatron.utils import ( @@ -77,16 +76,6 @@ from nemo.utils import logging from nemo.utils.te_utils import is_float8tensor -try: - import apex.transformer.pipeline_parallel.utils - from apex.transformer.pipeline_parallel.utils import get_num_microbatches - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - - HAVE_APEX = False - try: from megatron.core import InferenceParams, parallel_state, tensor_parallel from megatron.core.datasets.blended_megatron_dataset_builder import BlendedMegatronDatasetBuilder @@ -104,6 +93,7 @@ get_gpt_layer_local_spec, get_gpt_layer_with_transformer_engine_spec, ) + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func from megatron.core.transformer.module import Float16Module as MCoreFloat16Module from megatron.core.transformer.transformer_config import TransformerConfig @@ -126,6 +116,8 @@ import transformer_engine from transformer_engine.pytorch import module as te_module + from nemo.collections.nlp.modules.common.hyena.hyena_spec import get_gpt_layer_with_te_and_hyena_spec + HAVE_TE = True except (ImportError, ModuleNotFoundError): @@ -145,6 +137,7 @@ def mcore_supports_moe() -> bool: return False +## TODO: This function will not work if TE is not installed def get_specs(spec_name, num_experts=None, moe_grouped_gemm=False, use_te=True, hyena_cfg: Dict = None): if num_experts is not None: assert mcore_supports_moe(), "Megatron-core >= v0.5.0 is required for MoE" @@ -277,10 +270,6 @@ class MegatronGPTModel(MegatronBaseModel, TextGeneration): """ def __init__(self, cfg: DictConfig, trainer: Trainer): - if not HAVE_APEX: - raise ImportError( - "Apex was not found. Please see the NeMo README for installation instructions: https://github.com/NVIDIA/NeMo#megatron-gpt." - ) if not HAVE_MEGATRON_CORE: logging.warning( "megatron-core was not found. Please see the NeMo README for installation instructions: https://github.com/NVIDIA/NeMo#megatron-gpt." @@ -788,7 +777,9 @@ def training_step(self, dataloader_iter): self.if_init_step = False if self.rampup_batch_size: - num_microbatch_calculator = apex.transformer.pipeline_parallel.utils._GLOBAL_NUM_MICROBATCHES_CALCULATOR + from megatron.core.num_microbatches_calculator import _GLOBAL_NUM_MICROBATCHES_CALCULATOR + + num_microbatch_calculator = _GLOBAL_NUM_MICROBATCHES_CALCULATOR current_global_batch_size = num_microbatch_calculator.current_global_batch_size # do validation and save the checkpoint when gbs is changed if self.prev_global_batch_size != current_global_batch_size and self.prev_global_batch_size: @@ -1672,7 +1663,9 @@ def setup(self, stage=None): self.init_global_step = self.trainer.global_step if self.rampup_batch_size: - num_microbatch_calculator = apex.transformer.pipeline_parallel.utils._GLOBAL_NUM_MICROBATCHES_CALCULATOR + from megatron.core.num_microbatches_calculator import _GLOBAL_NUM_MICROBATCHES_CALCULATOR + + num_microbatch_calculator = _GLOBAL_NUM_MICROBATCHES_CALCULATOR num_microbatch_calculator.update(self.init_consumed_samples, consistency_check=False) self.prev_consumed_samples = self.init_consumed_samples diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_prompt_learning_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_prompt_learning_model.py index acfc22439a7d..9590c535a86d 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_prompt_learning_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_prompt_learning_model.py @@ -44,19 +44,13 @@ from nemo.collections.nlp.parts.nlp_overrides import GradScaler, NLPSaveRestoreConnector from nemo.collections.nlp.parts.utils_funcs import get_last_rank from nemo.utils import AppState, logging +from nemo.utils.apex_utils import get_micro_batch_size from nemo.utils.decorators import deprecated_warning -try: - from apex.transformer.pipeline_parallel.utils import get_micro_batch_size, get_num_microbatches - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - HAVE_APEX = False - try: from megatron.core import InferenceParams, ModelParallelConfig, parallel_state, tensor_parallel from megatron.core.enums import ModelType + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func HAVE_MEGATRON_CORE = True diff --git a/nemo/collections/nlp/models/language_modeling/megatron_gpt_sft_model.py b/nemo/collections/nlp/models/language_modeling/megatron_gpt_sft_model.py index 28bcbf22ac33..9ab17189ca64 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_gpt_sft_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_gpt_sft_model.py @@ -37,21 +37,11 @@ from nemo.collections.nlp.parts.mixins.nlp_adapter_mixins import NLPAdapterModelMixin from nemo.collections.nlp.parts.utils_funcs import get_last_rank from nemo.utils import AppState, logging - -try: - from apex.transformer.pipeline_parallel.utils import ( - _reconfigure_microbatch_calculator, - get_current_global_batch_size, - get_micro_batch_size, - get_num_microbatches, - ) - - HAVE_APEX = True -except (ImportError, ModuleNotFoundError): - HAVE_APEX = False +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator, get_micro_batch_size try: from megatron.core import parallel_state + from megatron.core.num_microbatches_calculator import get_current_global_batch_size, get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func HAVE_MEGATRON_CORE = True @@ -70,10 +60,6 @@ class MegatronGPTSFTModel(NLPAdapterModelMixin, MegatronGPTModel): """ def __init__(self, cfg: DictConfig, trainer: Trainer): - if not HAVE_APEX: - raise ImportError( - "Apex was not found. Please see the NeMo README for installation instructions: https://github.com/NVIDIA/NeMo#megatron-gpt." - ) super().__init__(cfg, trainer=trainer) self.sep_id = cfg.get('sep_id', 49704) if hasattr(self.cfg.data, "validation_ds"): diff --git a/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py b/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py index 915d745b5b97..c7c175bfa0c1 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_lm_encoder_decoder_model.py @@ -36,7 +36,6 @@ MegatronTokenLevelEncoderDecoderModule, ) from nemo.collections.nlp.modules.common.megatron.utils import ( - ApexGuardDefaults, average_losses_across_data_parallel_group, build_attention_mask_3d, get_params_for_weight_decay_optimization, @@ -47,19 +46,7 @@ ) from nemo.collections.nlp.parts.utils_funcs import get_last_rank from nemo.utils import AppState, logging - -try: - from apex.transformer.pipeline_parallel.utils import ( - _reconfigure_microbatch_calculator, - get_micro_batch_size, - get_num_microbatches, - ) - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - - HAVE_APEX = False +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator, get_micro_batch_size try: from megatron.core import parallel_state, tensor_parallel @@ -71,6 +58,7 @@ get_t5_encoder_with_local_block_spec, get_t5_encoder_with_transformer_engine_block_spec, ) + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func from megatron.core.transformer.module import Float16Module as MCoreFloat16Module from megatron.core.transformer.transformer_config import TransformerConfig @@ -1512,7 +1500,7 @@ def dummy(): pad_profile = torch.zeros_like(scores).long() decoder_seq_lengths = torch.zeros_like(scores).fill_(predicted_tokens_dec.size(1) + 1) - # reconfigure batch size for apex since the tensor have been augmented with beam size + # reconfigure batch size since the tensor have been augmented with beam size global_batch_per_gpu = token_ids.shape[0] tensor_shape[1] = global_batch_per_gpu _reconfigure_microbatch_calculator( diff --git a/nemo/collections/nlp/models/language_modeling/megatron_retro_model.py b/nemo/collections/nlp/models/language_modeling/megatron_retro_model.py index 2a8e5713573b..3eb78d34b3f4 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_retro_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_retro_model.py @@ -67,16 +67,6 @@ from nemo.core.neural_types import ChannelType, NeuralType from nemo.utils import logging -try: - import apex.transformer.pipeline_parallel.utils - from apex.transformer.pipeline_parallel.utils import get_num_microbatches - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - - HAVE_APEX = False - try: from megatron.core import InferenceParams, parallel_state from megatron.core.models.retro import RetroModel as MCoreRetroModel @@ -84,6 +74,7 @@ from megatron.core.models.retro.decoder_spec import get_retro_decoder_block_spec from megatron.core.models.retro.utils import get_config_path as get_retro_config_path from megatron.core.models.retro.utils import get_gpt_data_dir as get_retro_data_dir + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func from megatron.core.transformer.module import Float16Module as MCoreFloat16Module from megatron.core.transformer.transformer_config import TransformerConfig @@ -97,6 +88,7 @@ except (ImportError, ModuleNotFoundError): TransformerConfig = ApexGuardDefaults + RetroConfig = ApexGuardDefaults HAVE_MEGATRON_CORE = False diff --git a/nemo/collections/nlp/models/language_modeling/megatron_t0_model.py b/nemo/collections/nlp/models/language_modeling/megatron_t0_model.py index 4d4d80b71a98..82bd84c8ada8 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_t0_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_t0_model.py @@ -25,16 +25,7 @@ from nemo.collections.nlp.data.language_modeling.t0_dataset import T0Dataset from nemo.collections.nlp.models.language_modeling.megatron_t5_sft_model import MegatronT5SFTModel from nemo.utils import AppState, logging - -try: - from apex.transformer.pipeline_parallel.utils import _reconfigure_microbatch_calculator - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - - HAVE_APEX = False - +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator try: from megatron.core import parallel_state @@ -194,7 +185,10 @@ def build_train_valid_test_datasets(self, stage): logging.info(f'Length of train dataset: {len(self._train_ds)}') def build_data_loader( - self, dataset, data_cfg, consumed_samples=0, + self, + dataset, + data_cfg, + consumed_samples=0, ): """Buld dataloader given an input dataset.""" logging.info(f'Building dataloader with consumed samples: {consumed_samples}') @@ -224,13 +218,19 @@ def setup_training_dataloader(self): if hasattr(self, '_train_ds'): consumed_samples = self.compute_consumed_samples(0) self._train_dl = self.build_data_loader( - dataset=self._train_ds, data_cfg=self.cfg.data.train_ds, consumed_samples=consumed_samples, + dataset=self._train_ds, + data_cfg=self.cfg.data.train_ds, + consumed_samples=consumed_samples, ) def setup_eval_dataloader(self, datasets, data_cfg): dataloaders = [] for dataset in datasets: - eval_dl = self.build_data_loader(dataset=dataset, data_cfg=data_cfg, consumed_samples=0,) + eval_dl = self.build_data_loader( + dataset=dataset, + data_cfg=data_cfg, + consumed_samples=0, + ) dataloaders.append(eval_dl) return dataloaders diff --git a/nemo/collections/nlp/models/language_modeling/megatron_t5_prompt_learning_model.py b/nemo/collections/nlp/models/language_modeling/megatron_t5_prompt_learning_model.py index f13be45db836..0773e4abe811 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_t5_prompt_learning_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_t5_prompt_learning_model.py @@ -34,23 +34,12 @@ from nemo.collections.nlp.parts.nlp_overrides import NLPSaveRestoreConnector from nemo.collections.nlp.parts.utils_funcs import get_last_rank from nemo.utils import AppState, logging - -try: - from apex.transformer.pipeline_parallel.utils import ( - _reconfigure_microbatch_calculator, - get_micro_batch_size, - get_num_microbatches, - ) - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - - HAVE_APEX = False +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator, get_micro_batch_size try: from megatron.core import parallel_state from megatron.core.enums import ModelType + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func HAVE_MEGATRON_CORE = True @@ -65,21 +54,21 @@ class MegatronT5PromptLearningModel(MegatronBasePromptLearningModel): """ - Model class for prompt-tuning or p-tuning a pretrained Megatron T5 model. + Model class for prompt-tuning or p-tuning a pretrained Megatron T5 model. Prompt Tuning initalizes virtual prompt embeddings directly from a copy of certain token embeddings from the the pretrained T5 model's vocabulary - and directly tunes these embedding weights. The token embeddings used in - initalization are specified by the user in the config file. The model can - be prompt-tuned for multiple tasks at once. Virtual prompts are stored in a - prompt table and can be added or deleted without disrupting virtual prompts - for other tasks. + and directly tunes these embedding weights. The token embeddings used in + initalization are specified by the user in the config file. The model can + be prompt-tuned for multiple tasks at once. Virtual prompts are stored in a + prompt table and can be added or deleted without disrupting virtual prompts + for other tasks. P-tuning initializes an LSTM encoder model that generates virtual prompt embeddings for every task. Each task shares the same encoder. After p-tuning is compelete, the learned virtual prompts can be saved to the prompt table - using add_ptuned_prompts_to_prompt_table(). Thus, if a user wants to add a - new virtual prompt via p-tuning, they do not need to retrain on all previous + using add_ptuned_prompts_to_prompt_table(). Thus, if a user wants to add a + new virtual prompt via p-tuning, they do not need to retrain on all previous tasks. This gives p-tuning the same task flexiblity as prompt-tuning. """ @@ -93,7 +82,15 @@ def first_stage_of_pipeline(self): return False def forward( - self, input_ids, dec_input, enc_mask, dec_mask, position_ids, taskname_ids, labels=None, inference=False, + self, + input_ids, + dec_input, + enc_mask, + dec_mask, + position_ids, + taskname_ids, + labels=None, + inference=False, ): """ Special forward method for p-tuning/prompt-tuning pretrained @@ -174,8 +171,8 @@ def load_frozen_model(self, cfg, trainer): def fwd_bwd_step(self, dataloader_iter, batch_idx, forward_only): """ - Dataloader produces a global batch which is turned into a list of microbatches. - The list of microbatches is then piped through the pipeline using megatron-core fwd/bwd functions. + Dataloader produces a global batch which is turned into a list of microbatches. + The list of microbatches is then piped through the pipeline using megatron-core fwd/bwd functions. """ # Get seq length of batch batch = next(dataloader_iter) @@ -230,15 +227,15 @@ def loss_func(output_tensor): return fwd_output_and_loss_func def backward(self, *args, **kwargs): - """ LightningModule hook to do backward. - We want this to do nothing since we run backward in the fwd/bwd functions from megatron-core. - No need to call it here. + """LightningModule hook to do backward. + We want this to do nothing since we run backward in the fwd/bwd functions from megatron-core. + No need to call it here. """ return def optimizer_zero_grad(self, *args, **kwargs): - """ LightningModule hook to zero grad. - We want this to do nothing as we are zeroing grads during the training_step. + """LightningModule hook to zero grad. + We want this to do nothing as we are zeroing grads during the training_step. """ return @@ -291,9 +288,9 @@ def get_predictions(self, input_ids, enc_mask, encoder_input, labels): enc_mask=enc_mask, num_tokens_to_generate=self.decoder_seq_length, encoder_input=encoder_input, - bos_id=self.tokenizer.pad_id - if self.cfg.data.get('decoder_starts_with_pad', False) - else self.tokenizer.bos_id, + bos_id=( + self.tokenizer.pad_id if self.cfg.data.get('decoder_starts_with_pad', False) else self.tokenizer.bos_id + ), ) # Special ids to text function to handle stripping and special tokens with sentencepiece tokenizers. preds_text = MegatronT5SFTModel.ids_to_text(predicted_token_ids, self.tokenizer) @@ -385,7 +382,8 @@ def on_validation_epoch_end(self): gather_results_dedup = list(set(itertools.chain(*gather_results))) val_metric_dict = self.validation_metric.get_score( - [i[2] for i in gather_results_dedup], [i[1] for i in gather_results_dedup], + [i[2] for i in gather_results_dedup], + [i[1] for i in gather_results_dedup], ) for metric, val in val_metric_dict.items(): @@ -445,9 +443,9 @@ def build_virtual_prompt_dataset( drop_last=drop_last, num_workers=num_workers, pin_memory=pin_memory, - persistent_workers=True - if num_workers > 0 - else False, # (@adithyare and @eharper) We need to set this to True to get around issues with spawn=True + persistent_workers=( + True if num_workers > 0 else False + ), # (@adithyare and @eharper) We need to set this to True to get around issues with spawn=True ) print('build success', len(dataloader), dataset_paths) return dataset, dataloader @@ -477,9 +475,9 @@ def predict_step(self, batch: Any, batch_idx: int, dataloader_idx: int = 0) -> A enc_mask=enc_mask, num_tokens_to_generate=self.decoder_seq_length, encoder_input=encoder_input, - bos_id=self.tokenizer.pad_id - if self.cfg.data.get('decoder_starts_with_pad', False) - else self.tokenizer.bos_id, + bos_id=( + self.tokenizer.pad_id if self.cfg.data.get('decoder_starts_with_pad', False) else self.tokenizer.bos_id + ), ) # Special ids to text function to handle stripping and special tokens with sentencepiece tokenizers. preds_text = MegatronT5SFTModel.ids_to_text(predicted_token_ids, self.tokenizer) @@ -522,7 +520,7 @@ def on_predict_epoch_end(self) -> None: input_prediction_pair = [] correct = 0 - for (input, pred, label) in gather_results_dedup: + for input, pred, label in gather_results_dedup: input_prediction_pair.append((input, pred)) if label: if pred == label: diff --git a/nemo/collections/nlp/models/language_modeling/megatron_t5_sft_model.py b/nemo/collections/nlp/models/language_modeling/megatron_t5_sft_model.py index 2344dac3a64a..e71ed4964c29 100644 --- a/nemo/collections/nlp/models/language_modeling/megatron_t5_sft_model.py +++ b/nemo/collections/nlp/models/language_modeling/megatron_t5_sft_model.py @@ -30,21 +30,11 @@ from nemo.collections.nlp.parts.mixins.nlp_adapter_mixins import NLPAdapterModelMixin from nemo.collections.nlp.parts.utils_funcs import get_last_rank from nemo.utils import AppState, logging - -try: - from apex.transformer.pipeline_parallel.utils import ( - _reconfigure_microbatch_calculator, - get_current_global_batch_size, - get_micro_batch_size, - get_num_microbatches, - ) - - HAVE_APEX = True -except (ImportError, ModuleNotFoundError): - HAVE_APEX = False +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator, get_micro_batch_size try: from megatron.core import parallel_state + from megatron.core.num_microbatches_calculator import get_current_global_batch_size, get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func HAVE_MEGATRON_CORE = True @@ -57,13 +47,9 @@ class MegatronT5SFTModel(NLPAdapterModelMixin, MegatronT5Model): - """ T5 Finetuning model in the same format as MegatronGPTSFTModel """ + """T5 Finetuning model in the same format as MegatronGPTSFTModel""" def __init__(self, cfg: DictConfig, trainer: Trainer): - if not HAVE_APEX: - raise ImportError( - "Apex was not found. Please see the NeMo README for installation instructions: https://github.com/NVIDIA/NeMo#megatron-gpt." - ) super().__init__(cfg, trainer=trainer) self.val_metric = self.test_metric = None if hasattr(self.cfg.data, "validation_ds"): @@ -290,8 +276,8 @@ def _reconfigure_and_process_inference_batch(self, batch, ds_config): def fwd_bwd_step(self, dataloader_iter, forward_only): """ - Dataloader produces a global batch which is turned into a list of microbatches. - The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. + Dataloader produces a global batch which is turned into a list of microbatches. + The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. """ # If tuple, 1st element in it is the batch since dataloader_iter returns batch, batch_idx, dataloader_idx batch = next(dataloader_iter) @@ -605,7 +591,13 @@ def on_test_epoch_end(self): # return super().on_test_epoch_end() def build_data_loader( - self, dataset, global_batch_size, shuffle, num_workers, pin_memory, drop_last, + self, + dataset, + global_batch_size, + shuffle, + num_workers, + pin_memory, + drop_last, ): """Buld dataloader given an input dataset.""" @@ -652,9 +644,11 @@ def setup_eval_data(self, datasets, data_cfg): for dataset in datasets: eval_dl = self.build_data_loader( dataset, - global_batch_size=self.cfg.data.test_ds.global_batch_size - if hasattr(self.cfg.data, "test_ds") - else self.cfg.data.validation_ds.global_batch_size, + global_batch_size=( + self.cfg.data.test_ds.global_batch_size + if hasattr(self.cfg.data, "test_ds") + else self.cfg.data.validation_ds.global_batch_size + ), shuffle=data_cfg.shuffle, num_workers=data_cfg.num_workers, pin_memory=data_cfg.pin_memory, diff --git a/nemo/collections/nlp/models/machine_translation/megatron_nmt_model.py b/nemo/collections/nlp/models/machine_translation/megatron_nmt_model.py index 5a41682a4b5b..6a76f88cd229 100644 --- a/nemo/collections/nlp/models/machine_translation/megatron_nmt_model.py +++ b/nemo/collections/nlp/models/machine_translation/megatron_nmt_model.py @@ -51,22 +51,11 @@ from nemo.collections.nlp.parts.utils_funcs import get_last_rank from nemo.core.classes import Exportable from nemo.utils import AppState, logging, timers - -try: - from apex.transformer.pipeline_parallel.utils import ( - _reconfigure_microbatch_calculator, - get_micro_batch_size, - get_num_microbatches, - ) - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - - HAVE_APEX = False +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator, get_micro_batch_size try: from megatron.core import parallel_state + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func HAVE_MEGATRON_CORE = True @@ -210,17 +199,21 @@ def _build_tokenizer(self): self.encoder_tokenizer, self.decoder_tokenizer = MTEncDecModel.setup_enc_dec_tokenizers( encoder_tokenizer_library=self.encoder_tokenizer_library, encoder_tokenizer_model=encoder_tokenizer_model, - encoder_bpe_dropout=self._cfg.encoder_tokenizer.get('bpe_dropout', 0.0) - if self._cfg.encoder_tokenizer.get('bpe_dropout', 0.0) is not None - else 0.0, + encoder_bpe_dropout=( + self._cfg.encoder_tokenizer.get('bpe_dropout', 0.0) + if self._cfg.encoder_tokenizer.get('bpe_dropout', 0.0) is not None + else 0.0 + ), encoder_model_name=self._cfg.encoder_tokenizer.get('type', None), encoder_r2l=self._cfg.encoder_tokenizer.get('r2l', False), decoder_tokenizer_library=self.decoder_tokenizer_library, encoder_tokenizer_vocab_file=self._cfg.encoder_tokenizer.get('vocab_file', None), decoder_tokenizer_model=decoder_tokenizer_model, - decoder_bpe_dropout=self._cfg.decoder_tokenizer.get('bpe_dropout', 0.0) - if self._cfg.decoder_tokenizer.get('bpe_dropout', 0.0) is not None - else 0.0, + decoder_bpe_dropout=( + self._cfg.decoder_tokenizer.get('bpe_dropout', 0.0) + if self._cfg.decoder_tokenizer.get('bpe_dropout', 0.0) is not None + else 0.0 + ), decoder_model_name=self._cfg.encoder_tokenizer.get('type', None), decoder_r2l=self._cfg.decoder_tokenizer.get('r2l', False), encoder_sentencepiece_legacy=self._cfg.encoder_tokenizer.get('sentencepiece_legacy', False), @@ -252,10 +245,14 @@ def _build_vocab(self): f"NMT-XLM objective requires sentencepiece tokenizer, but got decoder tokenizer library : {self.cfg.decoder_tokenizer.library}" ) MegatronT5Model.add_special_tokens_to_tokenizer( - tokenizer=self.encoder_tokenizer, tokenizer_cfg=self.cfg.encoder_tokenizer, dataset_type='ul2', + tokenizer=self.encoder_tokenizer, + tokenizer_cfg=self.cfg.encoder_tokenizer, + dataset_type='ul2', ) MegatronT5Model.add_special_tokens_to_tokenizer( - tokenizer=self.decoder_tokenizer, tokenizer_cfg=self.cfg.decoder_tokenizer, dataset_type='ul2', + tokenizer=self.decoder_tokenizer, + tokenizer_cfg=self.cfg.decoder_tokenizer, + dataset_type='ul2', ) # Set up pre and post processors as well. @@ -277,7 +274,10 @@ def _build_vocab(self): else: # After this call, the model will have self.source_processor and self.target_processor objects self.source_processor, self.target_processor = MTEncDecModel.setup_pre_and_post_processing_utils( - self.src_language, self.tgt_language, self.encoder_tokenizer_library, self.decoder_tokenizer_library, + self.src_language, + self.tgt_language, + self.encoder_tokenizer_library, + self.decoder_tokenizer_library, ) self.multilingual_ids = [None] @@ -289,8 +289,8 @@ def _build_vocab(self): def fwd_bwd_step(self, dataloader_iter, forward_only): """ - Dataloader produces a global batch which is turned into a list of microbatches. - The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. + Dataloader produces a global batch which is turned into a list of microbatches. + The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. """ # If tuple, 1st element in it is the batch since dataloader_iter returns batch, batch_idx, dataloader_idx batch = next(dataloader_iter) @@ -351,13 +351,19 @@ def eval_step(self, dataloader_iter): # Post-process the translations and inputs to log. preds = self.postprocess_outputs( - outputs=predicted_tokens_ids, tokenizer=self.decoder_tokenizer, processor=target_processor, + outputs=predicted_tokens_ids, + tokenizer=self.decoder_tokenizer, + processor=target_processor, ) labels = self.postprocess_outputs( - outputs=labels, tokenizer=self.decoder_tokenizer, processor=target_processor, + outputs=labels, + tokenizer=self.decoder_tokenizer, + processor=target_processor, ) encoder_inputs = self.postprocess_outputs( - outputs=tokens_enc, tokenizer=self.encoder_tokenizer, processor=source_processor, + outputs=tokens_enc, + tokenizer=self.encoder_tokenizer, + processor=source_processor, ) loss_dict = { @@ -781,12 +787,12 @@ def build_memmap_dataset_from_config(self, cfg: DictConfig): tgt_file=tgt_file, num_samples=num_samples, prepend_id=multilingual_ids[idx], - src_language=self.src_language - if not isinstance(self.src_language, ListConfig) - else self.src_language[idx], - tgt_language=self.tgt_language - if not isinstance(self.tgt_language, ListConfig) - else self.tgt_language[idx], + src_language=( + self.src_language if not isinstance(self.src_language, ListConfig) else self.src_language[idx] + ), + tgt_language=( + self.tgt_language if not isinstance(self.tgt_language, ListConfig) else self.tgt_language[idx] + ), ) datasets.append(dataset) dataset = BlendableDataset( diff --git a/nemo/collections/nlp/modules/common/megatron/megatron_init.py b/nemo/collections/nlp/modules/common/megatron/megatron_init.py index 55e386bb22e5..a82c56c38092 100644 --- a/nemo/collections/nlp/modules/common/megatron/megatron_init.py +++ b/nemo/collections/nlp/modules/common/megatron/megatron_init.py @@ -16,15 +16,18 @@ import numpy as np import torch +from megatron.core.num_microbatches_calculator import ( + ConstantNumMicroBatchesCalculator, + init_num_microbatches_calculator, +) from nemo.utils import AppState, logging try: from apex.transformer.log_util import set_logging_level - from apex.transformer.microbatches import ConstantNumMicroBatches - from apex.transformer.pipeline_parallel.utils import setup_microbatch_calculator HAVE_APEX = True + except (ImportError, ModuleNotFoundError): HAVE_APEX = False @@ -136,10 +139,10 @@ def initialize_model_parallel_for_nemo( if global_batch_size and micro_batch_size is not None: # TODO: add rampup_batch_size here when we have it implemented - from apex.transformer.pipeline_parallel.utils import _GLOBAL_NUM_MICROBATCHES_CALCULATOR + from megatron.core.num_microbatches_calculator import _GLOBAL_NUM_MICROBATCHES_CALCULATOR if _GLOBAL_NUM_MICROBATCHES_CALCULATOR is None: - setup_microbatch_calculator( + init_num_microbatches_calculator( rank=global_rank, global_batch_size=global_batch_size, micro_batch_size=micro_batch_size, @@ -147,7 +150,7 @@ def initialize_model_parallel_for_nemo( rampup_batch_size=rampup_batch_size, ) else: - if isinstance(_GLOBAL_NUM_MICROBATCHES_CALCULATOR, ConstantNumMicroBatches): + if isinstance(_GLOBAL_NUM_MICROBATCHES_CALCULATOR, ConstantNumMicroBatchesCalculator): assert _GLOBAL_NUM_MICROBATCHES_CALCULATOR.current_global_batch_size == global_batch_size assert _GLOBAL_NUM_MICROBATCHES_CALCULATOR.micro_batch_size == micro_batch_size assert _GLOBAL_NUM_MICROBATCHES_CALCULATOR.num_micro_batches == global_batch_size // ( @@ -158,7 +161,8 @@ def initialize_model_parallel_for_nemo( app_state._is_megatron_initialized = True - set_logging_level(apex_transformer_log_level) + if HAVE_APEX: + set_logging_level(apex_transformer_log_level) def _set_random_seed(seed_): diff --git a/nemo/collections/nlp/modules/common/megatron/transformer.py b/nemo/collections/nlp/modules/common/megatron/transformer.py index cb23c4a6b1fd..7ef6ec2d91e9 100644 --- a/nemo/collections/nlp/modules/common/megatron/transformer.py +++ b/nemo/collections/nlp/modules/common/megatron/transformer.py @@ -1525,7 +1525,7 @@ def forward( It indicates if the current step in the forward pass is the first in a gradient accumulation cycle. If set, FP8 weights are cached and some minor optimizations are applied to fuse_wgrad_accumulation """ - from apex.transformer.pipeline_parallel.utils import _GLOBAL_NUM_MICROBATCHES_CALCULATOR + from megatron.core.num_microbatches_calculator import _GLOBAL_NUM_MICROBATCHES_CALCULATOR num_micro_batches = getattr(_GLOBAL_NUM_MICROBATCHES_CALCULATOR, 'num_micro_batches', 1) diff --git a/nemo/collections/nlp/modules/common/text_generation_strategy.py b/nemo/collections/nlp/modules/common/text_generation_strategy.py index 3b57b3988310..9d05dc5cdba2 100644 --- a/nemo/collections/nlp/modules/common/text_generation_strategy.py +++ b/nemo/collections/nlp/modules/common/text_generation_strategy.py @@ -28,14 +28,7 @@ from nemo.collections.nlp.modules.common.megatron.utils import get_ltor_masks_and_position_ids try: - from apex.transformer.pipeline_parallel.utils import get_num_microbatches - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - HAVE_APEX = False - -try: + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func from megatron.core.transformer.identity_op import IdentityOp from megatron.core.transformer.module import Float16Module as MCoreFloat16Module diff --git a/nemo/collections/nlp/modules/common/text_generation_utils.py b/nemo/collections/nlp/modules/common/text_generation_utils.py index d4b67d3e3783..87e88b61c211 100644 --- a/nemo/collections/nlp/modules/common/text_generation_utils.py +++ b/nemo/collections/nlp/modules/common/text_generation_utils.py @@ -38,15 +38,7 @@ from nemo.collections.nlp.modules.common.text_generation_strategy import model_inference_strategy_dispatcher from nemo.collections.nlp.modules.common.transformer.text_generation import LengthParam, OutputType, SamplingParam from nemo.utils import AppState - -try: - from apex.transformer.pipeline_parallel.utils import _reconfigure_microbatch_calculator - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - - HAVE_APEX = False +from nemo.utils.apex_utils import _reconfigure_microbatch_calculator try: from megatron.core import parallel_state, tensor_parallel diff --git a/nemo/collections/nlp/parts/nlp_overrides.py b/nemo/collections/nlp/parts/nlp_overrides.py index b003e310baeb..fbf000de8bbf 100644 --- a/nemo/collections/nlp/parts/nlp_overrides.py +++ b/nemo/collections/nlp/parts/nlp_overrides.py @@ -74,7 +74,6 @@ from nemo.utils.model_utils import ckpt_to_dir, inject_model_parallel_rank, uninject_model_parallel_rank try: - from apex.transformer.pipeline_parallel.utils import get_num_microbatches from nemo.core.optim.distributed_adam import MegatronDistributedFusedAdam from nemo.core.optim.mcore_optim import McoreDistributedOptimizer @@ -105,6 +104,7 @@ optim_state_to_sharding_state, ) from megatron.core.dist_checkpointing.strategies import tensorstore + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.tensor_parallel.layers import param_is_not_tensor_parallel_duplicate from megatron.core.transformer.module import Float16Module as MCoreFloat16Module from megatron.core.transformer.transformer_layer import TransformerLayer as MCoreTransformerLayer diff --git a/nemo/collections/vision/models/megatron_vit_classification_models.py b/nemo/collections/vision/models/megatron_vit_classification_models.py index 46788d2c882c..3417b04299dc 100644 --- a/nemo/collections/vision/models/megatron_vit_classification_models.py +++ b/nemo/collections/vision/models/megatron_vit_classification_models.py @@ -40,17 +40,9 @@ from nemo.core.classes.common import PretrainedModelInfo from nemo.utils import logging -try: - from apex.transformer.pipeline_parallel.utils import get_num_microbatches - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - - HAVE_APEX = False - try: from megatron.core import parallel_state + from megatron.core.num_microbatches_calculator import get_num_microbatches from megatron.core.pipeline_parallel.schedules import get_forward_backward_func HAVE_MEGATRON_CORE = True @@ -113,10 +105,6 @@ class MegatronVitClassificationModel(MegatronBaseModel): """Megatron Vision Transformer Model.""" def __init__(self, cfg: DictConfig, trainer: Trainer): - if not HAVE_APEX: - raise ImportError( - "Apex was not found. Please see the NeMo README for installation instructions: https://github.com/NVIDIA/NeMo#megatron-gpt." - ) if not HAVE_MEGATRON_CORE: raise ImportError( "megatron-core was not found. Please see the NeMo README for installation instructions: https://github.com/NVIDIA/NeMo#megatron-gpt." @@ -286,7 +274,10 @@ def fwd_bwd_step(self, dataloader_iter, forward_only): grad_sync_func = None param_sync_func = None if not forward_only and self.with_distributed_adam: - no_sync_func = partial(self._optimizer.no_sync, greedy_grad_copy=self.megatron_amp_O2,) + no_sync_func = partial( + self._optimizer.no_sync, + greedy_grad_copy=self.megatron_amp_O2, + ) grad_sync_func = self.reduce_overlap_gradients param_sync_func = self.sync_overlap_parameters @@ -357,12 +348,12 @@ def initialize_ub_func(self): def training_step(self, dataloader_iter): """ - Our dataloaders produce a micro-batch and then we fetch - a number of microbatches depending on the global batch size and model parallel size - from the dataloader to produce a list of microbatches. - Batch should be a list of microbatches and those microbatches should on CPU. - Microbatches are then moved to GPU during the pipeline. - The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. + Our dataloaders produce a micro-batch and then we fetch + a number of microbatches depending on the global batch size and model parallel size + from the dataloader to produce a list of microbatches. + Batch should be a list of microbatches and those microbatches should on CPU. + Microbatches are then moved to GPU during the pipeline. + The list of microbatches is then piped through the pipeline using Apex fwd/bwd functions. """ # Initialize userbuffer communicators. if self.initialize_ub: @@ -425,20 +416,20 @@ def training_step(self, dataloader_iter): return loss_mean def backward(self, *args, **kwargs): - """ LightningModule hook to do backward. - We want this to do nothing since we run backward in the fwd/bwd functions from apex. - No need to call it here. + """LightningModule hook to do backward. + We want this to do nothing since we run backward in the fwd/bwd functions from apex. + No need to call it here. """ pass def optimizer_zero_grad(self, *args, **kwargs): - """ LightningModule hook to zero grad. - We want this to do nothing as we are zeroing grads during the training_step. + """LightningModule hook to zero grad. + We want this to do nothing as we are zeroing grads during the training_step. """ pass def _append_sequence_parallel_module_grads(self, module, grads): - """ Helper method for allreduce_sequence_parallel_gradients""" + """Helper method for allreduce_sequence_parallel_gradients""" for param in module.parameters(): sequence_parallel_param = getattr(param, 'sequence_parallel', False) @@ -450,9 +441,9 @@ def _append_sequence_parallel_module_grads(self, module, grads): grads.append(grad.data) def allreduce_sequence_parallel_gradients(self): - """ All-reduce layernorm parameters across model parallel nodes when sequence parallelism is used. - Modified from megatron-lm: - https://gitlab-master.nvidia.com/ADLR/megatron-lm/-/blob/3f91f09bb2ab32f9904b47f46f19d2fc3f518ed8/megatron/training.py#L425 + """All-reduce layernorm parameters across model parallel nodes when sequence parallelism is used. + Modified from megatron-lm: + https://gitlab-master.nvidia.com/ADLR/megatron-lm/-/blob/3f91f09bb2ab32f9904b47f46f19d2fc3f518ed8/megatron/training.py#L425 """ grads = [] @@ -512,10 +503,10 @@ def fwd_output_only_func(batch, model): def validation_step(self, dataloader_iter): """ - Our dataloaders produce a micro-batch and then we fetch - a number of microbatches depending on the global batch size and model parallel size - from the dataloader to produce a list of microbatches. - The list of microbatches is then piped through the pipeline using megatron-core fwd/bwd functions. + Our dataloaders produce a micro-batch and then we fetch + a number of microbatches depending on the global batch size and model parallel size + from the dataloader to produce a list of microbatches. + The list of microbatches is then piped through the pipeline using megatron-core fwd/bwd functions. """ mode = 'test' if self.trainer.testing else 'val' @@ -525,8 +516,10 @@ def validation_step(self, dataloader_iter): loss, accuracy = self.fwd_bwd_step(dataloader_iter, True) - self.validation_step_outputs.append((loss, accuracy)) if mode == 'val' else self.test_step_outputs.append( - (loss, accuracy) + ( + self.validation_step_outputs.append((loss, accuracy)) + if mode == 'val' + else self.test_step_outputs.append((loss, accuracy)) ) return loss, accuracy @@ -569,7 +562,9 @@ def build_train_valid_test_datasets(self): raise ValueError("limit_val_batches must be an integer or float less than or equal to 1.0.") self._train_ds, self._validation_ds = build_train_valid_datasets( - model_cfg=self.cfg, data_path=self.cfg.data.data_path, image_size=(self.cfg.img_h, self.cfg.img_w), + model_cfg=self.cfg, + data_path=self.cfg.data.data_path, + image_size=(self.cfg.img_h, self.cfg.img_w), ) self._test_ds = None @@ -709,16 +704,16 @@ def predict_step(self, batch: Any, batch_idx: int, dataloader_idx: Optional[int] raise NotImplementedError def transfer_batch_to_device(self, batch: Any, device: torch.device, dataloader_idx: int) -> Any: - """ PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device - When using pipeline parallelism, we need the global batch to remain on the CPU, - since the memory overhead will be too high when using a large number of microbatches. - Microbatches are transferred from CPU to GPU inside the pipeline. + """PTL hook: https://pytorch-lightning.readthedocs.io/en/latest/common/lightning_module.html#transfer-batch-to-device + When using pipeline parallelism, we need the global batch to remain on the CPU, + since the memory overhead will be too high when using a large number of microbatches. + Microbatches are transferred from CPU to GPU inside the pipeline. """ return batch def _validate_trainer(self): - """ Certain trainer configurations can break training. - Here we try to catch them and raise an error. + """Certain trainer configurations can break training. + Here we try to catch them and raise an error. """ if self.trainer.accumulate_grad_batches > 1: raise ValueError( diff --git a/nemo/lightning/data.py b/nemo/lightning/data.py index d83f5ba3b728..809885e75c79 100644 --- a/nemo/lightning/data.py +++ b/nemo/lightning/data.py @@ -53,14 +53,14 @@ def setup_microbatch_calculator( else: init_global_rank = global_rank - from apex.transformer.microbatches import ConstantNumMicroBatches - from apex.transformer.pipeline_parallel.utils import ( + from megatron.core.num_microbatches_calculator import ( _GLOBAL_NUM_MICROBATCHES_CALCULATOR, - setup_microbatch_calculator, + ConstantNumMicroBatchesCalculator, + init_num_microbatches_calculator, ) if _GLOBAL_NUM_MICROBATCHES_CALCULATOR is None: - setup_microbatch_calculator( + init_num_microbatches_calculator( rank=init_global_rank, global_batch_size=global_batch_size, micro_batch_size=micro_batch_size, @@ -68,7 +68,7 @@ def setup_microbatch_calculator( rampup_batch_size=rampup_batch_size, ) else: - if isinstance(_GLOBAL_NUM_MICROBATCHES_CALCULATOR, ConstantNumMicroBatches): + if isinstance(_GLOBAL_NUM_MICROBATCHES_CALCULATOR, ConstantNumMicroBatchesCalculator): assert _GLOBAL_NUM_MICROBATCHES_CALCULATOR.current_global_batch_size == global_batch_size assert _GLOBAL_NUM_MICROBATCHES_CALCULATOR.micro_batch_size == micro_batch_size assert _GLOBAL_NUM_MICROBATCHES_CALCULATOR.num_micro_batches == global_batch_size // ( diff --git a/nemo/lightning/megatron_parallel.py b/nemo/lightning/megatron_parallel.py index b68950d561a3..0c141e82ab52 100644 --- a/nemo/lightning/megatron_parallel.py +++ b/nemo/lightning/megatron_parallel.py @@ -151,6 +151,7 @@ def __init__( convert_module_fn: Optional[Callable[[ModelT], nn.Module]] = None, ) -> None: from megatron.core import parallel_state + from megatron.core.tensor_parallel import set_defaults_if_not_set_tensor_model_parallel_attributes _pipeline: List[nn.Module] if isinstance(pipeline, nn.ModuleList): diff --git a/nemo/lightning/pytorch/optim/megatron.py b/nemo/lightning/pytorch/optim/megatron.py index 7faa53f32b65..1eb5290652a4 100644 --- a/nemo/lightning/pytorch/optim/megatron.py +++ b/nemo/lightning/pytorch/optim/megatron.py @@ -1,3 +1,4 @@ +import inspect from typing import Callable, List, Optional import pytorch_lightning as pl @@ -92,8 +93,12 @@ def sharded_state_dict( is_loading=False, sharding_type='fully_sharded_model_space', ): + mcore_optimizer_sig = inspect.signature(self.mcore_optimizer.sharded_state_dict).parameters + distrib_optim_kwargs = {} + if "sharding_type" in mcore_optimizer_sig: + distrib_optim_kwargs["sharding_type"] = sharding_type state_dict = self.mcore_optimizer.sharded_state_dict( - model_sharded_state_dict, is_loading=is_loading, sharding_type=sharding_type + model_sharded_state_dict, is_loading=is_loading, **distrib_optim_kwargs ) return state_dict diff --git a/nemo/lightning/pytorch/plugins/data_sampler.py b/nemo/lightning/pytorch/plugins/data_sampler.py index 378375e3bc0c..8d023d3bb574 100644 --- a/nemo/lightning/pytorch/plugins/data_sampler.py +++ b/nemo/lightning/pytorch/plugins/data_sampler.py @@ -62,7 +62,7 @@ def compute_consumed_samples(self, steps_since_resume=0) -> int: app_state = AppState() if self.rampup_batch_size is not None: - from apex.transformer.pipeline_parallel.utils import _GLOBAL_NUM_MICROBATCHES_CALCULATOR + from megatron.core.num_microbatches_calculator import _GLOBAL_NUM_MICROBATCHES_CALCULATOR current_global_batch_size = getattr(_GLOBAL_NUM_MICROBATCHES_CALCULATOR, "current_global_batch_size", 1) consumed_samples = self.prev_consumed_samples + self.if_first_step * current_global_batch_size @@ -85,7 +85,7 @@ def on_megatron_step_start(self, trainer: pl.Trainer) -> None: trainer.should_stop = True def on_megatron_step_end(self, trainer: pl.Trainer, pl_module: pl.LightningModule) -> None: - import apex.transformer.pipeline_parallel.utils + from megatron.core.num_microbatches_calculator import _GLOBAL_NUM_MICROBATCHES_CALCULATOR if self.rampup_batch_size is None: return @@ -105,9 +105,7 @@ def on_megatron_step_end(self, trainer: pl.Trainer, pl_module: pl.LightningModul self.prev_consumed_samples = consumed_samples - num_microbatch_calculator = ( - apex.transformer.pipeline_parallel.utils._GLOBAL_NUM_MICROBATCHES_CALCULATOR # noqa: SLF001 - ) + num_microbatch_calculator = _GLOBAL_NUM_MICROBATCHES_CALCULATOR # noqa: SLF001 num_microbatch_calculator.update( consumed_samples=consumed_samples, @@ -133,17 +131,15 @@ def megatron_data_kwargs(self) -> Dict[str, Any]: @property def num_microbatches(self) -> int: - from apex.transformer.pipeline_parallel.utils import get_num_microbatches + from megatron.core.num_microbatches_calculator import get_num_microbatches return get_num_microbatches() @property def current_global_batch_size(self) -> int: - import apex.transformer.pipeline_parallel.utils + from megatron.core.num_microbatches_calculator import _GLOBAL_NUM_MICROBATCHES_CALCULATOR - num_microbatch_calculator = ( - apex.transformer.pipeline_parallel.utils._GLOBAL_NUM_MICROBATCHES_CALCULATOR # noqa: SLF001 - ) + num_microbatch_calculator = _GLOBAL_NUM_MICROBATCHES_CALCULATOR # noqa: SLF001 current_global_batch_size = num_microbatch_calculator.current_global_batch_size return current_global_batch_size diff --git a/nemo/utils/apex_utils.py b/nemo/utils/apex_utils.py new file mode 100644 index 000000000000..b3b57a175287 --- /dev/null +++ b/nemo/utils/apex_utils.py @@ -0,0 +1,25 @@ +import warnings +from typing import List, Optional + +import torch + + +def _reconfigure_microbatch_calculator( + rank: int, + rampup_batch_size: Optional[List[int]], + global_batch_size: int, + micro_batch_size: int, + data_parallel_size: int, +) -> None: + + import megatron.core.num_microbatches_calculator as mb_calculator + + mb_calculator._GLOBAL_NUM_MICROBATCHES_CALCULATOR = mb_calculator.build_num_microbatches_calculator( + rank, rampup_batch_size, global_batch_size, micro_batch_size, data_parallel_size + ) + + +def get_micro_batch_size(): + from megatron.core.num_microbatches_calculator import _GLOBAL_NUM_MICROBATCHES_CALCULATOR + + return _GLOBAL_NUM_MICROBATCHES_CALCULATOR.micro_batch_size diff --git a/tests/collections/nlp/test_rampup_batch_size.py b/tests/collections/nlp/test_rampup_batch_size.py index 803afa35168b..fea61571e70f 100644 --- a/tests/collections/nlp/test_rampup_batch_size.py +++ b/tests/collections/nlp/test_rampup_batch_size.py @@ -16,30 +16,22 @@ import pytest import torch +from megatron.core.num_microbatches_calculator import get_num_microbatches from omegaconf import DictConfig from pytorch_lightning import Trainer - from nemo.collections.nlp.models.language_modeling.megatron_gpt_model import MegatronGPTModel from nemo.collections.nlp.parts.nlp_overrides import NLPDDPStrategy -try: - import apex.transformer.pipeline_parallel.utils - from apex.transformer.pipeline_parallel.utils import get_num_microbatches - - HAVE_APEX = True - -except (ImportError, ModuleNotFoundError): - - HAVE_APEX = False - DEVICE_CAPABILITY = None if torch.cuda.is_available(): DEVICE_CAPABILITY = torch.cuda.get_device_capability() def reset_microbatch_calculator(): - apex.transformer.pipeline_parallel.utils._GLOBAL_NUM_MICROBATCHES_CALCULATOR = None + import megatron.core.num_microbatches_calculator as mb + + mb._GLOBAL_NUM_MICROBATCHES_CALCULATOR = None @pytest.fixture() @@ -173,7 +165,9 @@ def test_rampup_bs(self, gpt_model, rampup_batch_size): @pytest.mark.unit def test_rampup_bs_schedule(self, gpt_model, trainer_cfg, rampup_batch_size_schedule): - num_microbatch_calculator = apex.transformer.pipeline_parallel.utils._GLOBAL_NUM_MICROBATCHES_CALCULATOR + from megatron.core.num_microbatches_calculator import _GLOBAL_NUM_MICROBATCHES_CALCULATOR + + num_microbatch_calculator = _GLOBAL_NUM_MICROBATCHES_CALCULATOR micro_batch_size = gpt_model.cfg.micro_batch_size num_devices = trainer_cfg["devices"] num_nodes = trainer_cfg["num_nodes"] diff --git a/tutorials/nlp/lora.ipynb b/tutorials/nlp/lora.ipynb index b878b9cfe453..a8e3138ba8ef 100644 --- a/tutorials/nlp/lora.ipynb +++ b/tutorials/nlp/lora.ipynb @@ -723,7 +723,7 @@ "execution_count": null, "outputs": [], "source": [ - "from apex.transformer.pipeline_parallel.utils import _reconfigure_microbatch_calculator\n", + "from nemo.utils.apex_utils import _reconfigure_microbatch_calculator\n", "_reconfigure_microbatch_calculator(\n", " rank=0,\n", " rampup_batch_size=None,\n",