Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
Support heterogeneous environment service (#3097)
Browse files Browse the repository at this point in the history
  • Loading branch information
SparkSnail authored Dec 15, 2020
1 parent dec91f7 commit 872554f
Show file tree
Hide file tree
Showing 27 changed files with 671 additions and 266 deletions.
52 changes: 52 additions & 0 deletions docs/en_US/TrainingService/HeterogeneousMode.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
**Run an Experiment on Heterogeneous Mode**
===========================================

Run NNI on heterogeneous mode means that NNI will run trials jobs in multiple kinds of training platforms. For example, NNI could submit trial jobs to remote machine and AML simultaneously。

## Setup environment
NNI has supported [local](./LocalMode.md), [remote](./RemoteMachineMode.md), [pai](./PaiMode.md) and [AML](./AMLMode.md) for heterogeneous training service. Before starting an experiment using these mode, users should setup the corresponding environment for the platforms. More details about the environment setup could be found in the corresponding docs.



## Run an experiment
Use `examples/trials/mnist-tfv1` as an example. The NNI config YAML file's content is like:

.. code-block:: yaml
authorName: default
experimentName: example_mnist
trialConcurrency: 2
maxExecDuration: 1h
maxTrialNum: 10
trainingServicePlatform: heterogeneous
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
tuner:
builtinTunerName: TPE
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
trial:
command: python3 mnist.py
codeDir: .
gpuNum: 1
heterogeneousConfig:
trainingServicePlatforms:
- local
- remote
remoteConfig:
reuse: true
machineList:
- ip: 10.1.1.1
username: bob
passwd: bob123
Configurations for heterogeneous mode:

heterogeneousConfig:
* trainingServicePlatforms. required key. This field specify the platforms used in heterogeneous mode, the values using yaml list format. NNI support setting `local`, `remote`, `aml`, `pai` in this field.


Note:
If setting a platform in trainingServicePlatforms mode, users should also set the corresponding configuration for the platform. For example, if set `remote` as one of the platform, should also set `machineList` and `remoteConfig` configuration.
1 change: 1 addition & 0 deletions docs/en_US/training_services.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ Introduction to NNI Training Services
FrameworkController<./TrainingService/FrameworkControllerMode>
DLTS<./TrainingService/DLTSMode>
AML<./TrainingService/AMLMode>
Heterogeneous<./TrainingService/HeterogeneousMode>
32 changes: 32 additions & 0 deletions examples/trials/mnist-tfv1/config_heterogeneous.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
authorName: default
experimentName: example_mnist
trialConcurrency: 3
maxExecDuration: 1h
maxTrialNum: 10
trainingServicePlatform: heterogeneous
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
tuner:
#choice: TPE, Random, Anneal, Evolution, BatchTuner, MetisTuner, GPTuner
#SMAC (SMAC should be installed through nnictl)
builtinTunerName: TPE
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
trial:
command: python3 mnist.py
codeDir: .
gpuNum: 0
heterogeneousConfig:
trainingServicePlatforms:
- local
- remote
remoteConfig:
reuse: true
machineList:
- ip: 10.1.1.1
username: bob
passwd: bob123
#port can be skip if using default ssh port 22
#port: 22
3 changes: 2 additions & 1 deletion nni/runtime/env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
'NNI_SYS_DIR',
'NNI_OUTPUT_DIR',
'NNI_TRIAL_SEQ_ID',
'MULTI_PHASE'
'MULTI_PHASE',
'REUSE_MODE'
]

_dispatcher_env_var_names = [
Expand Down
2 changes: 1 addition & 1 deletion nni/runtime/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def init_logger() -> None:
if trial_platform == 'unittest':
return

if trial_platform:
if trial_platform and not trial_env_vars.REUSE_MODE:
_init_logger_trial()
return

Expand Down
2 changes: 1 addition & 1 deletion nni/runtime/platform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .standalone import *
elif trial_env_vars.NNI_PLATFORM == 'unittest':
from .test import *
elif trial_env_vars.NNI_PLATFORM in ('adl', 'local', 'remote', 'pai', 'kubeflow', 'frameworkcontroller', 'paiYarn', 'dlts', 'aml'):
elif trial_env_vars.NNI_PLATFORM in ('local', 'remote', 'pai', 'kubeflow', 'frameworkcontroller', 'paiYarn', 'dlts', 'aml', 'adl', 'heterogeneous'):
from .local import *
else:
raise RuntimeError('Unknown platform %s' % trial_env_vars.NNI_PLATFORM)
3 changes: 2 additions & 1 deletion nni/runtime/platform/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
if not os.path.exists(_outputdir):
os.makedirs(_outputdir)

_reuse_mode = trial_env_vars.REUSE_MODE
_nni_platform = trial_env_vars.NNI_PLATFORM

_multiphase = trial_env_vars.MULTI_PHASE
Expand Down Expand Up @@ -58,7 +59,7 @@ def get_next_parameter():
return params

def send_metric(string):
if _nni_platform != 'local':
if _nni_platform != 'local' or _reuse_mode in ('true', 'True'):
assert len(string) < 1000000, 'Metric too long'
print("NNISDK_MEb'%s'" % (string), flush=True)
else:
Expand Down
47 changes: 43 additions & 4 deletions nni/tools/nnictl/config_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def validate(self, data):
Optional('maxExecDuration'): And(Regex(r'^[1-9][0-9]*[s|m|h|d]$', error='ERROR: maxExecDuration format is [digit]{s,m,h,d}')),
Optional('maxTrialNum'): setNumberRange('maxTrialNum', int, 1, 99999),
'trainingServicePlatform': setChoice(
'trainingServicePlatform', 'adl', 'remote', 'local', 'pai', 'kubeflow', 'frameworkcontroller', 'paiYarn', 'dlts', 'aml'),
'trainingServicePlatform', 'remote', 'local', 'pai', 'kubeflow', 'frameworkcontroller', 'paiYarn', 'dlts', 'aml', 'adl', 'heterogeneous'),
Optional('searchSpacePath'): And(os.path.exists, error=SCHEMA_PATH_ERROR % 'searchSpacePath'),
Optional('multiPhase'): setType('multiPhase', bool),
Optional('multiThread'): setType('multiThread', bool),
Expand Down Expand Up @@ -208,7 +208,7 @@ def validate(self, data):
}

pai_config_schema = {
'paiConfig': {
Optional('paiConfig'): {
'userName': setType('userName', str),
Or('passWord', 'token', only_one=True): str,
'host': setType('host', str),
Expand Down Expand Up @@ -252,7 +252,7 @@ def validate(self, data):
}

aml_config_schema = {
'amlConfig': {
Optional('amlConfig'): {
'subscriptionId': setType('subscriptionId', str),
'resourceGroup': setType('resourceGroup', str),
'workspaceName': setType('workspaceName', str),
Expand All @@ -262,6 +262,29 @@ def validate(self, data):
}
}

heterogeneous_trial_schema = {
'trial': {
'codeDir': setPathCheck('codeDir'),
Optional('nniManagerNFSMountPath'): setPathCheck('nniManagerNFSMountPath'),
Optional('containerNFSMountPath'): setType('containerNFSMountPath', str),
Optional('nasMode'): setChoice('nasMode', 'classic_mode', 'enas_mode', 'oneshot_mode', 'darts_mode'),
'command': setType('command', str),
Optional('gpuNum'): setNumberRange('gpuNum', int, 0, 99999),
Optional('cpuNum'): setNumberRange('cpuNum', int, 0, 99999),
Optional('memoryMB'): setType('memoryMB', int),
Optional('image'): setType('image', str),
Optional('virtualCluster'): setType('virtualCluster', str),
Optional('paiStorageConfigName'): setType('paiStorageConfigName', str),
Optional('paiConfigPath'): And(os.path.exists, error=SCHEMA_PATH_ERROR % 'paiConfigPath')
}
}

heterogeneous_config_schema = {
'heterogeneousConfig': {
'trainingServicePlatforms': ['local', 'remote', 'pai', 'aml']
}
}

adl_trial_schema = {
'trial':{
'codeDir': setType('codeDir', str),
Expand Down Expand Up @@ -404,7 +427,7 @@ def validate(self, data):
}

machine_list_schema = {
'machineList': [Or(
Optional('machineList'): [Or(
{
'ip': setType('ip', str),
Optional('port'): setNumberRange('port', int, 1, 65535),
Expand Down Expand Up @@ -438,6 +461,8 @@ def validate(self, data):
'frameworkcontroller': Schema({**common_schema, **frameworkcontroller_trial_schema, **frameworkcontroller_config_schema}),
'aml': Schema({**common_schema, **aml_trial_schema, **aml_config_schema}),
'dlts': Schema({**common_schema, **dlts_trial_schema, **dlts_config_schema}),
'heterogeneous': Schema({**common_schema, **heterogeneous_trial_schema, **heterogeneous_config_schema, **machine_list_schema,
**pai_config_schema, **aml_config_schema, **remote_config_schema}),
}


Expand All @@ -454,6 +479,7 @@ def validate_extras(self, experiment_config):
self.validate_pai_trial_conifg(experiment_config)
self.validate_kubeflow_operators(experiment_config)
self.validate_eth0_device(experiment_config)
self.validate_heterogeneous_platforms(experiment_config)

def validate_tuner_adivosr_assessor(self, experiment_config):
if experiment_config.get('advisor'):
Expand Down Expand Up @@ -563,3 +589,16 @@ def validate_eth0_device(self, experiment_config):
and not experiment_config.get('nniManagerIp') \
and 'eth0' not in netifaces.interfaces():
raise SchemaError('This machine does not contain eth0 network device, please set nniManagerIp in config file!')

def validate_heterogeneous_platforms(self, experiment_config):
required_config_name_map = {
'remote': 'machineList',
'aml': 'amlConfig',
'pai': 'paiConfig'
}
if experiment_config.get('trainingServicePlatform') == 'heterogeneous':
for platform in experiment_config['heterogeneousConfig']['trainingServicePlatforms']:
config_name = required_config_name_map.get(platform)
if config_name and not experiment_config.get(config_name):
raise SchemaError('Need to set {0} for {1} in heterogeneous mode!'.format(config_name, platform))

55 changes: 48 additions & 7 deletions nni/tools/nnictl/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,6 @@ def set_local_config(experiment_config, port, config_file_name):
request_data = dict()
if experiment_config.get('localConfig'):
request_data['local_config'] = experiment_config['localConfig']
if request_data['local_config']:
if request_data['local_config'].get('gpuIndices') and isinstance(request_data['local_config'].get('gpuIndices'), int):
request_data['local_config']['gpuIndices'] = str(request_data['local_config'].get('gpuIndices'))
if request_data['local_config'].get('maxTrialNumOnEachGpu'):
request_data['local_config']['maxTrialNumOnEachGpu'] = request_data['local_config'].get('maxTrialNumOnEachGpu')
if request_data['local_config'].get('useActiveGpu'):
request_data['local_config']['useActiveGpu'] = request_data['local_config'].get('useActiveGpu')
response = rest_put(cluster_metadata_url(port), json.dumps(request_data), REST_TIME_OUT)
err_message = ''
if not response or not check_response(response):
Expand Down Expand Up @@ -306,6 +299,37 @@ def set_aml_config(experiment_config, port, config_file_name):
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message

def set_heterogeneous_config(experiment_config, port, config_file_name):
'''set heterogeneous configuration'''
heterogeneous_config_data = dict()
heterogeneous_config_data['heterogeneous_config'] = experiment_config['heterogeneousConfig']
platform_list = experiment_config['heterogeneousConfig']['trainingServicePlatforms']
for platform in platform_list:
if platform == 'aml':
heterogeneous_config_data['aml_config'] = experiment_config['amlConfig']
elif platform == 'remote':
if experiment_config.get('remoteConfig'):
heterogeneous_config_data['remote_config'] = experiment_config['remoteConfig']
heterogeneous_config_data['machine_list'] = experiment_config['machineList']
elif platform == 'local' and experiment_config.get('localConfig'):
heterogeneous_config_data['local_config'] = experiment_config['localConfig']
elif platform == 'pai':
heterogeneous_config_data['pai_config'] = experiment_config['paiConfig']
response = rest_put(cluster_metadata_url(port), json.dumps(heterogeneous_config_data), REST_TIME_OUT)
err_message = None
if not response or not response.status_code == 200:
if response is not None:
err_message = response.text
_, stderr_full_path = get_log_path(config_file_name)
with open(stderr_full_path, 'a+') as fout:
fout.write(json.dumps(json.loads(err_message), indent=4, sort_keys=True, separators=(',', ':')))
return False, err_message
result, message = setNNIManagerIp(experiment_config, port, config_file_name)
if not result:
return result, message
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message

def set_experiment(experiment_config, mode, port, config_file_name):
'''Call startExperiment (rest POST /experiment) with yaml file content'''
request_data = dict()
Expand Down Expand Up @@ -387,6 +411,21 @@ def set_experiment(experiment_config, mode, port, config_file_name):
{'key': 'aml_config', 'value': experiment_config['amlConfig']})
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
elif experiment_config['trainingServicePlatform'] == 'heterogeneous':
request_data['clusterMetaData'].append(
{'key': 'heterogeneous_config', 'value': experiment_config['heterogeneousConfig']})
platform_list = experiment_config['heterogeneousConfig']['trainingServicePlatforms']
request_dict = {
'aml': {'key': 'aml_config', 'value': experiment_config.get('amlConfig')},
'remote': {'key': 'machine_list', 'value': experiment_config.get('machineList')},
'pai': {'key': 'pai_config', 'value': experiment_config.get('paiConfig')},
'local': {'key': 'local_config', 'value': experiment_config.get('localConfig')}
}
for platform in platform_list:
if request_dict.get(platform):
request_data['clusterMetaData'].append(request_dict[platform])
request_data['clusterMetaData'].append(
{'key': 'trial_config', 'value': experiment_config['trial']})
response = rest_post(experiment_url(port), json.dumps(request_data), REST_TIME_OUT, show_error=True)
if check_response(response):
return response
Expand Down Expand Up @@ -420,6 +459,8 @@ def set_platform_config(platform, experiment_config, port, config_file_name, res
config_result, err_msg = set_dlts_config(experiment_config, port, config_file_name)
elif platform == 'aml':
config_result, err_msg = set_aml_config(experiment_config, port, config_file_name)
elif platform == 'heterogeneous':
config_result, err_msg = set_heterogeneous_config(experiment_config, port, config_file_name)
else:
raise Exception(ERROR_INFO % 'Unsupported platform!')
exit(1)
Expand Down
2 changes: 1 addition & 1 deletion nni/tools/trial_tool/trial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ def main_loop(args):
'''main loop logic for trial runner'''
idle_last_time = datetime.now()
gpu_refresh_last_time = datetime.now() - timedelta(minutes=1)

try:
if args.job_pid_file:
with open(args.job_pid_file, 'w') as job_file:
Expand Down Expand Up @@ -188,6 +187,7 @@ def check_version(args):
os.environ['NNI_EXP_ID'] = args.exp_id
os.environ['MULTI_PHASE'] = "true"
os.environ['NNI_TRIAL_JOB_ID'] = "runner"
os.environ['REUSE_MODE'] = "true"

from .log_utils import LogType, RemoteLogger, StdOutputType, nni_log
from .trial import Trial
Expand Down
Loading

0 comments on commit 872554f

Please sign in to comment.