diff --git a/tools/nni_trial_tool/hdfsClientUtility.py b/tools/nni_trial_tool/hdfsClientUtility.py index c732d2507c..9369e77a41 100644 --- a/tools/nni_trial_tool/hdfsClientUtility.py +++ b/tools/nni_trial_tool/hdfsClientUtility.py @@ -20,7 +20,6 @@ import os import posixpath -from pyhdfs import HdfsClient from .log_utils import LogType, nni_log def copyHdfsDirectoryToLocal(hdfsDirectory, localDirectory, hdfsClient): @@ -79,7 +78,8 @@ def copyDirectoryToHdfs(localDirectory, hdfsDirectory, hdfsClient): try: result = result and copyDirectoryToHdfs(file_path, hdfs_directory, hdfsClient) except Exception as exception: - nni_log(LogType.Error, 'Copy local directory {0} to hdfs directory {1} error: {2}'.format(file_path, hdfs_directory, str(exception))) + nni_log(LogType.Error, + 'Copy local directory {0} to hdfs directory {1} error: {2}'.format(file_path, hdfs_directory, str(exception))) result = False else: hdfs_file_path = os.path.join(hdfsDirectory, file) diff --git a/tools/nni_trial_tool/log_utils.py b/tools/nni_trial_tool/log_utils.py index 1806b06d79..8b07754ff7 100644 --- a/tools/nni_trial_tool/log_utils.py +++ b/tools/nni_trial_tool/log_utils.py @@ -33,8 +33,7 @@ from queue import Queue -from .rest_utils import rest_get, rest_post, rest_put, rest_delete -from .constants import NNI_EXP_ID, NNI_TRIAL_JOB_ID, STDOUT_API +from .rest_utils import rest_post from .url_utils import gen_send_stdout_url @unique @@ -154,8 +153,7 @@ def _populateQueue(stream, queue): self._is_read_completed = True break - self.pip_log_reader_thread = threading.Thread(target = _populateQueue, - args = (self.pipeReader, self.queue)) + self.pip_log_reader_thread = threading.Thread(target=_populateQueue, args=(self.pipeReader, self.queue)) self.pip_log_reader_thread.daemon = True self.start() self.pip_log_reader_thread.start() diff --git a/tools/nni_trial_tool/rest_utils.py b/tools/nni_trial_tool/rest_utils.py index 71eb353614..9f6227acbb 100644 --- a/tools/nni_trial_tool/rest_utils.py +++ b/tools/nni_trial_tool/rest_utils.py @@ -19,7 +19,6 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -import time import requests def rest_get(url, timeout): diff --git a/tools/nni_trial_tool/test/test_hdfsClientUtility.py b/tools/nni_trial_tool/test/test_hdfsClientUtility.py index 4a54a893c9..68ffe79d8f 100644 --- a/tools/nni_trial_tool/test/test_hdfsClientUtility.py +++ b/tools/nni_trial_tool/test/test_hdfsClientUtility.py @@ -18,16 +18,17 @@ # DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +import os +import shutil +import random +import string import unittest import json import sys from pyhdfs import HdfsClient +from tools.nni_trial_tool.hdfsClientUtility import copyFileToHdfs, copyDirectoryToHdfs sys.path.append("..") -from trial.hdfsClientUtility import copyFileToHdfs, copyDirectoryToHdfs -import os -import shutil -import random -import string + class HDFSClientUtilityTest(unittest.TestCase): '''Unit test for hdfsClientUtility.py''' @@ -82,7 +83,8 @@ def test_copy_directory_run(self): with open('./{0}/{1}'.format(directory_name, file_name), 'w') as file: file.write(file_content) - result = copyDirectoryToHdfs('./{}'.format(directory_name), '/{0}/{1}'.format(self.hdfs_config['userName'], directory_name), self.hdfs_client) + result = copyDirectoryToHdfs('./{}'.format(directory_name), + '/{0}/{1}'.format(self.hdfs_config['userName'], directory_name), self.hdfs_client) self.assertTrue(result) directory_list = self.hdfs_client.listdir('/{0}'.format(self.hdfs_config['userName'])) diff --git a/tools/nni_trial_tool/trial_keeper.py b/tools/nni_trial_tool/trial_keeper.py index 23d9c4f1ab..dc37ce9334 100644 --- a/tools/nni_trial_tool/trial_keeper.py +++ b/tools/nni_trial_tool/trial_keeper.py @@ -18,32 +18,31 @@ # ============================================================================================================================== # import argparse -import sys import os -from subprocess import Popen, PIPE +from subprocess import Popen import time import logging import shlex import re import sys -import select import json import threading from pyhdfs import HdfsClient import pkg_resources from .rest_utils import rest_post, rest_get -from .url_utils import gen_send_stdout_url, gen_send_version_url, gen_parameter_meta_url +from .url_utils import gen_send_version_url, gen_parameter_meta_url -from .constants import HOME_DIR, LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH, \ +from .constants import LOG_DIR, NNI_PLATFORM, STDOUT_FULL_PATH, STDERR_FULL_PATH, \ MULTI_PHASE, NNI_TRIAL_JOB_ID, NNI_SYS_DIR, NNI_EXP_ID from .hdfsClientUtility import copyDirectoryToHdfs, copyHdfsDirectoryToLocal, copyHdfsFileToLocal -from .log_utils import LogType, nni_log, RemoteLogger, PipeLogReader, StdOutputType +from .log_utils import LogType, nni_log, RemoteLogger, StdOutputType logger = logging.getLogger('trial_keeper') regular = re.compile('v?(?P[0-9](\.[0-9]){0,1}).*') _hdfs_client = None + def get_hdfs_client(args): global _hdfs_client @@ -62,15 +61,18 @@ def get_hdfs_client(args): if hdfs_host is not None and args.nni_hdfs_exp_dir is not None: try: if args.webhdfs_path: - _hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, webhdfs_path=args.webhdfs_path, timeout=5) + _hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, + webhdfs_path=args.webhdfs_path, timeout=5) else: # backward compatibility - _hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5) + _hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, + timeout=5) except Exception as e: nni_log(LogType.Error, 'Create HDFS client error: ' + str(e)) raise e return _hdfs_client + def main_loop(args): '''main loop logic for trial keeper''' @@ -79,9 +81,11 @@ def main_loop(args): stdout_file = open(STDOUT_FULL_PATH, 'a+') stderr_file = open(STDERR_FULL_PATH, 'a+') - trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout, args.log_collection) + trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', + StdOutputType.Stdout, args.log_collection) # redirect trial keeper's stdout and stderr to syslog - trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout, args.log_collection) + trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout, + args.log_collection) sys.stdout = sys.stderr = trial_keeper_syslogger hdfs_output_dir = None @@ -97,8 +101,10 @@ def main_loop(args): # Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior log_pipe_stdout = trial_syslogger_stdout.get_pipelog_reader() - process = Popen(args.trial_command, shell = True, stdout = log_pipe_stdout, stderr = log_pipe_stdout) - nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid, shlex.split(args.trial_command))) + process = Popen(args.trial_command, shell=True, stdout=log_pipe_stdout, stderr=log_pipe_stdout) + nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid, + shlex.split( + args.trial_command))) while True: retCode = process.poll() @@ -110,9 +116,11 @@ def main_loop(args): nni_local_output_dir = os.environ['NNI_OUTPUT_DIR'] try: if copyDirectoryToHdfs(nni_local_output_dir, hdfs_output_dir, hdfs_client): - nni_log(LogType.Info, 'copy directory from {0} to {1} success!'.format(nni_local_output_dir, hdfs_output_dir)) + nni_log(LogType.Info, + 'copy directory from {0} to {1} success!'.format(nni_local_output_dir, hdfs_output_dir)) else: - nni_log(LogType.Info, 'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, hdfs_output_dir)) + nni_log(LogType.Info, + 'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, hdfs_output_dir)) except Exception as e: nni_log(LogType.Error, 'HDFS copy directory got exception: ' + str(e)) raise e @@ -123,14 +131,16 @@ def main_loop(args): time.sleep(2) + def trial_keeper_help_info(*args): print('please run --help to see guidance') + def check_version(args): try: trial_keeper_version = pkg_resources.get_distribution('nni').version except pkg_resources.ResolutionError as err: - #package nni does not exist, try nni-tool package + # package nni does not exist, try nni-tool package nni_log(LogType.Error, 'Package nni does not exist!') os._exit(1) if not args.nni_manager_version: @@ -145,21 +155,26 @@ def check_version(args): log_entry = {} if trial_keeper_version != nni_manager_version: nni_log(LogType.Error, 'Version does not match!') - error_message = 'NNIManager version is {0}, TrialKeeper version is {1}, NNI version does not match!'.format(nni_manager_version, trial_keeper_version) + error_message = 'NNIManager version is {0}, TrialKeeper version is {1}, NNI version does not match!'.format( + nni_manager_version, trial_keeper_version) log_entry['tag'] = 'VCFail' log_entry['msg'] = error_message - rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, False) + rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, + False) os._exit(1) else: nni_log(LogType.Info, 'Version match!') log_entry['tag'] = 'VCSuccess' - rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, False) + rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, + False) except AttributeError as err: nni_log(LogType.Error, err) + def is_multi_phase(): return MULTI_PHASE and (MULTI_PHASE in ['True', 'true']) + def download_parameter(meta_list, args): """ Download parameter file to local working directory. @@ -171,7 +186,8 @@ def download_parameter(meta_list, args): ] """ nni_log(LogType.Debug, str(meta_list)) - nni_log(LogType.Debug, 'NNI_SYS_DIR: {}, trial Id: {}, experiment ID: {}'.format(NNI_SYS_DIR, NNI_TRIAL_JOB_ID, NNI_EXP_ID)) + nni_log(LogType.Debug, + 'NNI_SYS_DIR: {}, trial Id: {}, experiment ID: {}'.format(NNI_SYS_DIR, NNI_TRIAL_JOB_ID, NNI_EXP_ID)) nni_log(LogType.Debug, 'NNI_SYS_DIR files: {}'.format(os.listdir(NNI_SYS_DIR))) for meta in meta_list: if meta['experimentId'] == NNI_EXP_ID and meta['trialId'] == NNI_TRIAL_JOB_ID: @@ -180,6 +196,7 @@ def download_parameter(meta_list, args): hdfs_client = get_hdfs_client(args) copyHdfsFileToLocal(meta['filePath'], param_fp, hdfs_client, override=False) + def fetch_parameter_file(args): class FetchThread(threading.Thread): def __init__(self, args): @@ -203,6 +220,7 @@ def run(self): fetch_file_thread = FetchThread(args) fetch_file_thread.start() + if __name__ == '__main__': '''NNI Trial Keeper main function''' PARSER = argparse.ArgumentParser() @@ -210,9 +228,9 @@ def run(self): PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process') PARSER.add_argument('--nnimanager_ip', type=str, default='localhost', help='NNI manager rest server IP') PARSER.add_argument('--nnimanager_port', type=str, default='8081', help='NNI manager rest server port') - PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of pai_hdfs') # backward compatibility + PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of pai_hdfs') # backward compatibility PARSER.add_argument('--hdfs_output_dir', type=str, help='the output dir of hdfs') - PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of pai_hdfs') # backward compatibility + PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of pai_hdfs') # backward compatibility PARSER.add_argument('--hdfs_host', type=str, help='the host of hdfs') PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs') PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs') @@ -233,4 +251,3 @@ def run(self): except Exception as e: nni_log(LogType.Error, 'Exit trial keeper with code 1 because Exception: {} is catched'.format(str(e))) os._exit(1) -