Skip to content

Commit

Permalink
Merge pull request #359 from srivatsankrishnan/llama3_8b_dse
Browse files Browse the repository at this point in the history
Intial dse parameters for llama_8b
  • Loading branch information
srivatsankrishnan authored Feb 12, 2025
2 parents 217b13a + 03e298e commit ba115d7
Show file tree
Hide file tree
Showing 12 changed files with 246 additions and 77 deletions.
11 changes: 6 additions & 5 deletions conf/common/test/dse_nemo_run_llama3_8b.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@ description = "dse_nemo_run_llama3_8b"
test_template_name = "NeMoRun"

[cmd_args]
docker_image_url = "nvcr.io/nvidia/nemo:24.09"
docker_image_url = "nvcr.io/nvidia/nemo:24.12.rc3"
task = "pretrain"
recipe_name = "llama3_8b"

[cmd_args.data]
micro_batch_size = [1, 2, 4]
micro_batch_size = [1]
global_batch_size = [128, 256, 512]

[cmd_args.trainer]
max_steps = 5
max_steps = 100
val_check_interval = 1000
num_nodes = 1

[cmd_args.trainer.strategy]
tensor_model_parallel_size = [1, 2, 4]
pipeline_model_parallel_size = 1
expert_model_parallel_size = 1
pipeline_model_parallel_size = [1, 2, 4]
context_parallel_size = [1, 2]

[cmd_args.log.ckpt]
save_on_train_epoch_end = false
Expand Down
51 changes: 51 additions & 0 deletions conf/common/test/dse_nemo_run_llama3_8b_fp8.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name = "dse_nemo_run_llama3_8b_fp8"
description = "dse_nemo_run_llama3_8b"
test_template_name = "NeMoRun"

[cmd_args]
docker_image_url = "nvcr.io/nvidia/nemo:24.12.rc3"
task = "pretrain"
recipe_name = "llama3_8b"
num_layers = 32

[cmd_args.data]
micro_batch_size = [1]
global_batch_size = [128, 256, 512]

[cmd_args.trainer]
max_steps = 100
val_check_interval = 1000
num_nodes = 1

[cmd_args.trainer.strategy]
tensor_model_parallel_size = [1, 2, 4]
pipeline_model_parallel_size = [1, 2, 4]
context_parallel_size = [1, 2]

[cmd_args.trainer.plugins]
fp8 = "hybrid"
fp8_margin = 0
fp8_amax_history_len = 1024
fp8_amax_compute_algo = "max"
fp8_params = true
grad_reduce_in_fp32 = true

[cmd_args.log.ckpt]
save_on_train_epoch_end = false
save_last = false
32 changes: 32 additions & 0 deletions src/cloudai/_core/configurator/cloudai_gym.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# limitations under the License.

import asyncio
import csv
import logging
from pathlib import Path
from typing import Any, Dict, Optional, Tuple

Expand Down Expand Up @@ -116,12 +118,20 @@ def step(self, action: Any) -> Tuple[list, float, bool, dict]:
"""
for key, value in action.items():
self.update_nested_attr(self.test_run.test.test_definition.cmd_args, key, value)

if not self.test_run.test.test_definition.constraint_check:
logging.info("Constraint check failed. Skipping step.")
return [-1.0], -1.0, True, {}
logging.info(f"Running step {self.test_run.current_iteration} with action {action}")
asyncio.run(self.runner.run())

observation = self.get_observation(action)
reward = self.compute_reward(observation)
done = False
info = {}

self.write_trajectory(self.test_run.current_iteration, action, reward, observation)

return observation, reward, done, info

def render(self, mode: str = "human"):
Expand Down Expand Up @@ -209,3 +219,25 @@ def update_nested_attr(self, obj, attr_path, value):
else:
raise AttributeError(f"{type(obj).__name__!r} object has no attribute {attr!r}")
setattr(obj, attrs[-1], value)

def write_trajectory(self, step: int, action: Any, reward: float, observation: list):
"""
Write the trajectory to a CSV file.
Args:
step (int): The current step number.
action (Any): The action taken by the agent.
reward (float): The reward received for the action.
observation (list): The observation after taking the action.
"""
output_path = self.runner.runner.system.output_path / self.runner.runner.test_scenario.name
subdir = next(output_path.iterdir())
trajectory_file_path = subdir / f"{self.test_run.current_iteration}" / "trajectory.csv"

file_exists = trajectory_file_path.exists()

with open(trajectory_file_path, mode="a", newline="") as file:
writer = csv.writer(file)
if not file_exists:
writer.writerow(["step", "action", "reward", "observation"])
writer.writerow([step, action, reward, observation])
4 changes: 4 additions & 0 deletions src/cloudai/_core/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,7 @@ def extra_args_str(self) -> str:
@abstractmethod
def installables(self) -> list[Installable]:
return []

@property
def constraint_check(self) -> bool:
return True
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,33 @@ def generate_report(self, test_name: str, directory_path: Path, sol: Optional[fl
return

train_step_timings = []
step_timings = []

with open(stdout_file, "r") as f:
for line in f:
if "train_step_timing in s:" in line:
try:
timing = float(line.strip().split(":")[-1])
timing = float(line.split("train_step_timing in s:")[1].strip().split()[0])
train_step_timings.append(timing)
except ValueError:
if "global_step:" in line:
global_step = int(line.split("global_step:")[1].split("|")[0].strip())
if 80 <= global_step <= 100:
step_timings.append(timing)
except (ValueError, IndexError):
continue

if not train_step_timings:
logging.error(f"No train_step_timing found in {stdout_file}")
return

if len(step_timings) < 20:
step_timings = train_step_timings[1:]

stats = {
"avg": np.mean(train_step_timings),
"median": np.median(train_step_timings),
"min": np.min(train_step_timings),
"max": np.max(train_step_timings),
"avg": np.mean(step_timings),
"median": np.median(step_timings),
"min": np.min(step_timings),
"max": np.max(step_timings),
}

summary_file = directory_path / "report.txt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# limitations under the License.


import logging
import sys
from typing import Any, Dict, List, Union, cast

from cloudai import TestRun
Expand Down Expand Up @@ -51,10 +53,11 @@ def flatten_dict(self, d: dict[str, str], parent_key: str = "", sep: str = "."):
def append_flattened_dict(self, prefix: str, d: Dict[str, Any], command: List[str]):
flattened = self.flatten_dict(d)
for key, value in flattened.items():
if prefix:
command.append(f"{prefix}.{key}={value}")
else:
command.append(f"{key}={value}")
if value is not None:
if prefix:
command.append(f"{prefix}.{key}={value}")
else:
command.append(f"{key}={value}")

def generate_test_command(
self, env_vars: Dict[str, str], cmd_args: Dict[str, Union[str, List[str]]], tr: TestRun
Expand All @@ -64,13 +67,23 @@ def generate_test_command(
cmd_args_dict = tdef.cmd_args.model_dump()

cmd_args_dict.pop("docker_image_url")
cmd_args_dict.pop("num_layers")

command = ["nemo", "llm", cmd_args_dict.pop("task"), "--factory", cmd_args_dict.pop("recipe_name"), "-y"]

if tr.nodes:
command.append(f"trainer.num_nodes={len(self.system.parse_nodes(tr.nodes))}")
elif tr.num_nodes > 0:
command.append(f"trainer.num_nodes={tr.num_nodes}")
num_nodes = len(self.system.parse_nodes(tr.nodes)) if tr.nodes else tr.num_nodes

if cmd_args_dict["trainer"]["num_nodes"] and cmd_args_dict["trainer"]["num_nodes"] > num_nodes:
err = (
f"Mismatch in num_nodes: {num_nodes} vs {cmd_args_dict['trainer']['num_nodes']}. "
"trainer.num_nodes should be less than or equal to the number of nodes specified "
"in the test scenario."
)

logging.error(err)
sys.exit(1)

cmd_args_dict["trainer"]["num_nodes"] = num_nodes

self.append_flattened_dict("", cmd_args_dict, command)

Expand Down
51 changes: 47 additions & 4 deletions src/cloudai/test_definitions/nemo_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional, Union
from typing import List, Optional, Union, cast

from pydantic import BaseModel, Field
from pydantic import BaseModel, ConfigDict, Field

from cloudai import CmdArgs, DockerImage, Installable, TestDefinition


class Plugin(BaseModel):
"""Plugin configuration for NeMoRun."""

fp8: Optional[str] = None
fp8_margin: Optional[int] = None
fp8_amax_history_len: Optional[int] = None
fp8_amax_compute_algo: Optional[str] = None
fp8_wgrad: Optional[bool] = None
fp8_params: Optional[bool] = None
grad_reduce_in_fp32: Optional[bool] = None

model_config = ConfigDict(extra="forbid")


class Data(BaseModel):
"""Data configuration for NeMoRun."""

micro_batch_size: Union[int, List[int]] = 1
global_batch_size: Union[int, List[int]] = 1

model_config = ConfigDict(extra="forbid")


class TrainerStrategy(BaseModel):
Expand All @@ -35,14 +52,19 @@ class TrainerStrategy(BaseModel):
context_parallel_size: Union[int, List[int]] = 2
virtual_pipeline_model_parallel_size: Optional[Union[int, List[int]]] = None

model_config = ConfigDict(extra="forbid")


class Trainer(BaseModel):
"""Trainer configuration for NeMoRun."""

max_steps: Union[int, List[int]] = 1168251
max_steps: Union[int, List[int]] = 100
val_check_interval: Union[int, List[int]] = 1000
num_nodes: Union[int, List[int]] = 1
num_nodes: Optional[Union[int, List[int]]] = None
strategy: TrainerStrategy = Field(default_factory=TrainerStrategy)
plugins: Optional[Plugin] = None

model_config = ConfigDict(extra="forbid")


class LogCkpt(BaseModel):
Expand All @@ -51,19 +73,24 @@ class LogCkpt(BaseModel):
save_on_train_epoch_end: bool = Field(default=False)
save_last: bool = Field(default=False)

model_config = ConfigDict(extra="forbid")


class Log(BaseModel):
"""Base logging configuration for NeMoRun."""

ckpt: LogCkpt = Field(default_factory=LogCkpt)

model_config = ConfigDict(extra="forbid")


class NeMoRunCmdArgs(CmdArgs):
"""NeMoRun test command arguments."""

docker_image_url: str
task: str
recipe_name: str
num_layers: Optional[int] = None
trainer: Trainer = Field(default_factory=Trainer)
log: Log = Field(default_factory=Log)
data: Data = Field(default_factory=Data)
Expand All @@ -85,3 +112,19 @@ def docker_image(self) -> DockerImage:
def installables(self) -> list[Installable]:
"""Get list of installable objects."""
return [self.docker_image]

@property
def constraint_check(self) -> bool:
"""Check constraints for NeMoRun."""
tp = cast(int, self.cmd_args.trainer.strategy.tensor_model_parallel_size)
pp = cast(int, self.cmd_args.trainer.strategy.pipeline_model_parallel_size)
cp = cast(int, self.cmd_args.trainer.strategy.context_parallel_size)
vp = cast(Optional[int], self.cmd_args.trainer.strategy.virtual_pipeline_model_parallel_size)
num_nodes = cast(int, self.cmd_args.trainer.num_nodes)
num_gpus = num_nodes * 8
num_layers = cast(int, self.cmd_args.num_layers)

constraint1 = num_gpus % (tp * pp * cp) == 0
constraint2 = True if vp is None else (num_layers // pp) % vp == 0

return constraint1 and constraint2
2 changes: 1 addition & 1 deletion tests/ref_data/nemo-run-no-hook.sbatch
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
export SLURM_JOB_MASTER_NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1)


srun --mpi=pmix --container-image=nvcr.io/nvidia/nemo:24.09 --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results nemo llm pretrain --factory llama_3b -y trainer.num_nodes=1 trainer.max_steps=1168251 trainer.val_check_interval=1000 trainer.num_nodes=1 trainer.strategy.tensor_model_parallel_size=1 trainer.strategy.pipeline_model_parallel_size=1 trainer.strategy.context_parallel_size=2 trainer.strategy.virtual_pipeline_model_parallel_size=None log.ckpt.save_on_train_epoch_end=False log.ckpt.save_last=False data.micro_batch_size=1
srun --mpi=pmix --container-image=nvcr.io/nvidia/nemo:24.09 --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results nemo llm pretrain --factory llama_3b -y trainer.max_steps=100 trainer.val_check_interval=1000 trainer.num_nodes=1 trainer.strategy.tensor_model_parallel_size=1 trainer.strategy.pipeline_model_parallel_size=1 trainer.strategy.context_parallel_size=2 log.ckpt.save_on_train_epoch_end=False log.ckpt.save_last=False data.micro_batch_size=1 data.global_batch_size=1
2 changes: 1 addition & 1 deletion tests/ref_data/nemo-run-pre-test.sbatch
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ srun --output=__OUTPUT_DIR__/output/pre_test/nccl/stdout.txt --error=__OUTPUT_DI
SUCCESS_0=$(grep -q "Avg bus bandwidth" __OUTPUT_DIR__/output/pre_test/nccl/stdout.txt && echo 1 || echo 0)
PRE_TEST_SUCCESS=$( [ $SUCCESS_0 -eq 1 ] && echo 1 || echo 0 )
if [ $PRE_TEST_SUCCESS -eq 1 ]; then
srun --mpi=pmix --container-image=nvcr.io/nvidia/nemo:24.09 --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results nemo llm pretrain --factory llama_3b -y trainer.num_nodes=1 trainer.max_steps=1168251 trainer.val_check_interval=1000 trainer.num_nodes=1 trainer.strategy.tensor_model_parallel_size=1 trainer.strategy.pipeline_model_parallel_size=1 trainer.strategy.context_parallel_size=2 trainer.strategy.virtual_pipeline_model_parallel_size=None log.ckpt.save_on_train_epoch_end=False log.ckpt.save_last=False data.micro_batch_size=1
srun --mpi=pmix --container-image=nvcr.io/nvidia/nemo:24.09 --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results nemo llm pretrain --factory llama_3b -y trainer.max_steps=100 trainer.val_check_interval=1000 trainer.num_nodes=1 trainer.strategy.tensor_model_parallel_size=1 trainer.strategy.pipeline_model_parallel_size=1 trainer.strategy.context_parallel_size=2 log.ckpt.save_on_train_epoch_end=False log.ckpt.save_last=False data.micro_batch_size=1 data.global_batch_size=1
fi
Loading

0 comments on commit ba115d7

Please sign in to comment.