Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add throughput timer configuration #5363

Merged
merged 26 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5132e44
Add throughput timer configuration
deepcharm Apr 4, 2024
9c20ef9
Merge branch 'master' into add-timers-configuration
loadams Apr 5, 2024
bcdded8
Merge branch 'master' into add-timers-configuration
loadams Apr 15, 2024
0b4ae6f
Merge branch 'microsoft:master' into add-timers-configuration
deepcharm Apr 18, 2024
69bd7d2
Use DeepSpeedConfigModel for timers configuration
deepcharm Apr 18, 2024
a938f50
Removed constants.py, not needed anymore
deepcharm Apr 18, 2024
4b0fc67
Merge branch 'master' into add-timers-configuration
loadams Apr 19, 2024
45dcdb7
Merge branch 'master' into add-timers-configuration
deepcharm Apr 21, 2024
599b5da
Fixed pre-commit checks
deepcharm Apr 21, 2024
08514c0
Merge branch 'master' into add-timers-configuration
deepcharm Apr 24, 2024
1ff04d7
Merge branch 'master' into add-timers-configuration
loadams Apr 25, 2024
d914694
Merge branch 'master' into add-timers-configuration
loadams Apr 25, 2024
8c6c20c
Merge branch 'master' into add-timers-configuration
loadams Apr 29, 2024
d1626dc
Merge branch 'master' into add-timers-configuration
deepcharm Apr 30, 2024
ad8e1c4
Merge branch 'master' into add-timers-configuration
deepcharm May 5, 2024
8c1b536
Merge branch 'master' into add-timers-configuration
tjruwase May 7, 2024
fc8092e
Merge branch 'master' into add-timers-configuration
tjruwase May 13, 2024
4079392
Merge branch 'master' into add-timers-configuration
loadams May 13, 2024
0341f48
Merge branch 'master' into add-timers-configuration
loadams May 15, 2024
e6d71e5
Merge branch 'master' into add-timers-configuration
loadams May 16, 2024
091cc25
Merge branch 'master' into add-timers-configuration
loadams May 17, 2024
9bb53a0
Merge branch 'master' into add-timers-configuration
tjruwase May 20, 2024
7b1aa59
Merge branch 'master' into add-timers-configuration
tjruwase May 20, 2024
c8ac5a4
Merge branch 'master' into add-timers-configuration
loadams May 20, 2024
ceb1809
Merge branch 'master' into add-timers-configuration
loadams May 21, 2024
53b8240
Merge branch 'master' into add-timers-configuration
loadams May 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions deepspeed/runtime/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
from .data_pipeline.config import get_data_efficiency_enabled, get_data_efficiency_config, get_curriculum_enabled_legacy, get_curriculum_params_legacy
from .data_pipeline.constants import *

from ..utils.config import DeepSpeedThroughputTimerConfig

TENSOR_CORE_ALIGN_SIZE = 8

ADAGRAD_OPTIMIZER = 'adagrad'
Expand Down Expand Up @@ -911,6 +913,8 @@ def _initialize_params(self, param_dict):

self.compile_config = get_compile_config(param_dict)

self.timers_config = DeepSpeedThroughputTimerConfig(param_dict)

def _batch_assertion(self):

train_batch = self.train_batch_size
Expand Down
9 changes: 4 additions & 5 deletions deepspeed/runtime/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,10 @@ def __init__(self,
# Configure wall clock timers
self.timers = SynchronizedWallClockTimer()
# Throughput timer
self.tput_timer = ThroughputTimer(
batch_size=self.train_batch_size(),
steps_per_output=self.steps_per_print(),
monitor_memory=False,
)
self.tput_timer = ThroughputTimer(self._config.timers_config,
batch_size=self.train_batch_size(),
steps_per_output=self.steps_per_print(),
monitor_memory=False)

log_dist(f"DeepSpeed Flops Profiler Enabled: {self.flops_profiler_enabled()}", ranks=[0])

Expand Down
3 changes: 2 additions & 1 deletion deepspeed/runtime/pipe/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ def __init__(self, has_bool_tensors=False, *super_args, **super_kwargs):

self._force_grad_boundary = False

self.batch_timer = ThroughputTimer(batch_size=self.train_batch_size(),
self.batch_timer = ThroughputTimer(self._config.timers_config,
batch_size=self.train_batch_size(),
logging_fn=self.tput_log,
monitor_memory=False,
steps_per_output=self.steps_per_print())
Expand Down
28 changes: 28 additions & 0 deletions deepspeed/utils/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team

from deepspeed.runtime.config_utils import get_scalar_param, DeepSpeedConfigObject
from deepspeed.utils.constants import *


class DeepSpeedThroughputTimerConfig(DeepSpeedConfigObject):
deepcharm marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, param_dict):
super(DeepSpeedThroughputTimerConfig, self).__init__()

self.enabled = None
self.synchronized = None

timers_dict = {}
if param_dict and TIMERS in param_dict:
if TIMERS_THROUGHPUT in param_dict[TIMERS]:
timers_dict = param_dict[TIMERS][TIMERS_THROUGHPUT]

self._initialize(timers_dict)

def _initialize(self, param_dict):
self.enabled = get_scalar_param(param_dict, TIMERS_THROUGHPUT_ENABLED, TIMERS_THROUGHPUT_ENABLED_DEFAULT)
self.synchronized = get_scalar_param(param_dict, TIMERS_THROUGHPUT_SYNCHRONIZED,
TIMERS_THROUGHPUT_SYNCHRONIZED_DEFAULT)
33 changes: 33 additions & 0 deletions deepspeed/utils/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0

# DeepSpeed Team

#########################################
# Timers
#########################################
''' Elasticity Utility in DeepSpeed can be used to create highly elastic jobs compatible
with a large number of GPUs. For elastic jobs, DeepSpeed will provide a batch size that
can support a large number of GPUs based on the user specified parameters
'''
TIMERS_FORMAT = '''
Timers should be enabled as:
"timers": {
"throughput": {
"enabled": true,
"synchronized": true
}
}
'''

TIMERS = "timers"
TIMERS_THROUGHPUT = "throughput"

TIMERS_THROUGHPUT_ENABLED = "enabled"
TIMERS_THROUGHPUT_ENABLED_DEFAULT = True

# Synchronizing a device is required to produce the most accurate timer measurements.
# However, this comes at the expense of performance degradation. The CPU timer provides
# sufficient accuracy in many cases.
TIMERS_THROUGHPUT_SYNCHRONIZED = "synchronized"
TIMERS_THROUGHPUT_SYNCHRONIZED_DEFAULT = True
20 changes: 9 additions & 11 deletions deepspeed/utils/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,9 @@ def get_mean(self, names, normalizer=1.0, reset=True):

class ThroughputTimer:

def __init__(
self,
batch_size,
start_step=2,
steps_per_output=50,
monitor_memory=False,
logging_fn=None,
):
def __init__(self, config, batch_size, start_step=2, steps_per_output=50, monitor_memory=False, logging_fn=None):
from deepspeed.utils import logger
self.config = config
self.start_time = 0
self.end_time = 0
self.started = False
Expand Down Expand Up @@ -234,22 +228,26 @@ def _init_timer(self):
self.initialized = True

def start(self):
if not self.config.enabled:
return
self._init_timer()
self.started = True
if self.global_step_count >= self.start_step:
get_accelerator().synchronize()
if self.config.synchronized:
get_accelerator().synchronize()
self.start_time = time.time()

def stop(self, global_step=False, report_speed=True):
if not self.started:
if not self.config.enabled or not self.started:
return
self.started = False
self.micro_step_count += 1
if global_step:
self.global_step_count += 1

if self.start_time > 0:
get_accelerator().synchronize()
if self.config.synchronized:
get_accelerator().synchronize()
self.end_time = time.time()
duration = self.end_time - self.start_time
self.total_elapsed_time += duration
Expand Down
Loading