diff --git a/README.md b/README.md index 68f429be..6a11ff5b 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,7 @@ The dependencies of MI-prometheus are: * torchtext * tensorboardx * matplotlib + * psutil (enables grid-* to span child processes on MacOS and Ubuntu) * PyYAML * tqdm * nltk diff --git a/configs/example_trainer_gpu.yaml b/configs/example_trainer_gpu.yaml deleted file mode 100644 index a0d785ce..00000000 --- a/configs/example_trainer_gpu.yaml +++ /dev/null @@ -1,15 +0,0 @@ -batch_settings: - experiment_repetitions: 1 # number of experiments - max_concurrent_runs: 1 # number of gpus - -batch_tasks: - - - default_configs: configs/dwm_baselines/dwm/serial_recall.yaml - overwrite: - problem: - cuda: True - - - default_configs: configs/dwm_baselines/dwm/reverse_recall.yaml - overwrite: - problem: - cuda: True diff --git a/configs/vision/alexnet_mnist.yaml b/configs/vision/alexnet_mnist.yaml index 55c4e368..fa218374 100644 --- a/configs/vision/alexnet_mnist.yaml +++ b/configs/vision/alexnet_mnist.yaml @@ -17,7 +17,7 @@ training: lr: 0.01 # settings parameters terminal_conditions: - loss_stop: 1.0e-5 + loss_stop: 1.0e-3 episode_limit: 50000 epochs_limit: 10 @@ -25,7 +25,7 @@ training: validation: problem: name: *name - batch_size: 64 + batch_size: *b use_train_data: True # True because we are splitting the training set to: validation and training resize: [224, 224] @@ -33,7 +33,7 @@ validation: testing: problem: name: *name - batch_size: 64 + batch_size: *b use_train_data: False resize: [224, 224] diff --git a/configs/vision/grid_trainer_mnist.yaml b/configs/vision/grid_trainer_mnist.yaml new file mode 100644 index 00000000..2e501f40 --- /dev/null +++ b/configs/vision/grid_trainer_mnist.yaml @@ -0,0 +1,43 @@ +grid_tasks: + - + default_configs: configs/vision/lenet5_mnist.yaml + - + default_configs: configs/vision/simplecnn_mnist.yaml + +# Set exactly the same experiment conditions for the 2 tasks. +grid_overwrite: + training: + problem: + batch_size: &b 1000 + sampler: + name: SubsetRandomSampler + indices: [0, 55000] + # Set the same optimizer parameters. + optimizer: + name: Adam + lr: 0.01 + # Set the same terminal conditions. + terminal_conditions: + loss_stop: 4.0e-2 + episode_limit: 10000 + epoch_limit: 10 + + # Problem parameters: + validation: + problem: + batch_size: *b + sampler: + name: SubsetRandomSampler + indices: [55000, 60000] + + testing: + problem: + batch_size: *b + +grid_settings: + # Set number of repetitions of each experiments. + experiment_repetitions: 5 + # Set number of concurrent running experiments. + max_concurrent_runs: 4 + # Set trainer. + trainer: mip-online-trainer diff --git a/configs/vision/lenet5_mnist.yaml b/configs/vision/lenet5_mnist.yaml index 27ee5452..ad1d8564 100644 --- a/configs/vision/lenet5_mnist.yaml +++ b/configs/vision/lenet5_mnist.yaml @@ -2,9 +2,9 @@ training: problem: name: &name MNIST - batch_size: 64 + batch_size: &b 64 use_train_data: True - mnist_folder: &folder '~/data/mnist' + data_folder: &folder '~/data/mnist' resize: [32, 32] # Use sampler that operates on a subset. sampler: @@ -15,19 +15,19 @@ training: name: Adam lr: 0.01 # settings parameters - #terminal_condition: - # loss_stop: 1.0e-5 - # episode_limit: 10000 - # epoch_limit: 10 + terminal_conditions: + loss_stop: 1.0e-2 + episode_limit: 10000 + epoch_limit: 10 # Validation parameters: validation: - partial_validation_interval: 100 + #partial_validation_interval: 100 problem: name: *name - batch_size: 64 - use_train_data: True - mnist_folder: *folder + batch_size: *b + use_train_data: True # True because we are splitting the training set to: validation and training + data_folder: *folder resize: [32, 32] # Use sampler that operates on a subset. sampler: @@ -38,9 +38,9 @@ validation: testing: problem: name: *name - batch_size: 10000 + batch_size: *b use_train_data: False - mnist_folder: *folder + data_folder: *folder resize: [32, 32] # Model parameters: diff --git a/configs/vision/simplecnn_mnist.yaml b/configs/vision/simplecnn_mnist.yaml index 0e91ea7b..f4f2d1d4 100644 --- a/configs/vision/simplecnn_mnist.yaml +++ b/configs/vision/simplecnn_mnist.yaml @@ -5,7 +5,7 @@ training: problem: name: &name MNIST batch_size: &b 64 - mnist_folder: &folder '~/data/mnist' + data_folder: &folder '~/data/mnist' use_train_data: True resize: [32, 32] sampler: @@ -18,16 +18,16 @@ training: lr: 0.01 # settings parameters terminal_conditions: - loss_stop: 1.0e-5 - episode_limit: 20000 + loss_stop: 1.0e-3 + episode_limit: 1000 epoch_limit: 1 # Problem parameters: validation: problem: name: *name - batch_size: 5000 - mnist_folder: *folder + batch_size: *b + data_folder: *folder use_train_data: True # True because we are splitting the training set to: validation and training resize: [32, 32] sampler: @@ -43,8 +43,8 @@ testing: #seed_torch: 2452 problem: name: *name - batch_size: 10000 - mnist_folder: *folder + batch_size: *b + data_folder: *folder use_train_data: False resize: [32, 32] diff --git a/doc_build.sh b/doc_build.sh index 337af871..b3ca561e 100755 --- a/doc_build.sh +++ b/doc_build.sh @@ -8,4 +8,7 @@ sphinx-build -b html source build make html # open web browser(s) to master table of content -firefox build/index.html +if which firefox +then + firefox build/index.html +fi diff --git a/miprometheus/grid_workers/grid_analyzer.py b/miprometheus/grid_workers/grid_analyzer.py index 06a47c49..bbb8ece7 100644 --- a/miprometheus/grid_workers/grid_analyzer.py +++ b/miprometheus/grid_workers/grid_analyzer.py @@ -28,221 +28,379 @@ import os import csv import yaml -import numpy as np +import torch +import logging from datetime import datetime -from functools import partial -from multiprocessing.pool import ThreadPool from miprometheus.grid_workers.grid_worker import GridWorker class GridAnalyzer(GridWorker): """ - Implementation of the Grid Analyzer. Post-processes the test results of a grid of experiments and gather them in\ - a csv file. + Implementation of the :py:class:`miprometheus.grid_workers.GridAnalyzer`. - Inherits from ``GridWorker``. + Post-processes the test results of a grid of experiments and gather them in a csv file. + + This csv file will gather the training statistics (seeds, accuracies, terminal conditions,...), \ + the validation statistics and the test statistics. + + Inherits from :py:class:`miprometheus.grid_workers.GridWorker`. - TODO: complete doc """ - def __init__(self, name="GridAnalyzer", use_gpu=False): + def __init__(self, name="GridAnalyzer"): """ - Constructor for the ``GridAnalyzer``: - - - TODO: complete doc + Constructor for the :py:class:`miprometheus.grid_workers.GridAnalyzer`: + - Calls basic constructor of :py:class:`miprometheus.grid_workers.GridWorker` :param name: Name of the worker (DEFAULT: "GridAnalyzer"). :type name: str - :param use_gpu: Indicates whether the worker should use GPU or not. - :type use_gpu: bool - """ # call base constructor - super(GridAnalyzer, self).__init__(name=name,use_gpu=use_gpu) + super(GridAnalyzer, self).__init__(name=name, use_gpu=False) + + @staticmethod + def check_if_file_exists(dir_, filename_): + """ + Checks if ``filename_`` exists in ``dir_``. + + :param dir_: Path to file. + :type dir_: str + + :param filename_: Name of the file to be opened and analysed. + :type filename_: str + + :return: True if the file exists in the directory, else False + + """ + return os.path.isfile(os.path.join(dir_, filename_)) + + def check_file_content(self, dir_, filename_): + """ + Checks if the number of lines in the file is > 1. + + :param dir_: Path to file. + :type dir_: str + + :param filename_: Name of the file to be opened and analyzed. + :type filename_: str + + :return: True if the number of lines in the file is strictly greater than one. + + """ + return self.get_lines_number(os.path.join(dir_, filename_)) > 1 + + @staticmethod + def get_lines_number(filename_): + """ + Returns the number of lines in ``filename_``. + + :param filename_: Filepath to be opened and line-read. + :type filename_: str + + :return: Number of lines in the file. + + """ + with open(filename_) as f: + return sum(1 for _ in f) + + def get_experiment_tests(self, experiment_path_): + """ + Returns a list of folders containing valid test experiments data: + + - A configuration (`testing_configuration.yaml`), + - A csv file containing a data point for the aggregated statistics (`testing_set_agg_statistics.csv`) + + + :param experiment_path_: Path to experiment (training) folder. + :type experiment_path_: str + + :return: A list of valid test experiment folders. + + """ + experiments_tests = [] + for root, dirs, _ in os.walk(experiment_path_, topdown=True): + for name in dirs: + experiments_tests.append(os.path.join(root, name)) + + # Keep only the folders that contain a test configuration file and a csv statistics file. + experiments_tests = [elem for elem in experiments_tests if + self.check_if_file_exists(elem, 'testing_configuration.yaml') and + self.check_if_file_exists(elem, 'testing_set_agg_statistics.csv')] + + # Check if the csv file contains at least one data point. + experiments_tests = [elem for elem in experiments_tests if + self.check_file_content(elem, 'testing_set_agg_statistics.csv')] + + return experiments_tests def setup_grid_experiment(self): """ Setups the overall experiment: - - Calls the ``super(self).setup_experiment()`` to parse arguments, + - Parses arguments and sets logger level, + - Checks the presence of experiments folder, + - Recursively traverses the experiment folders, cherry-picking subfolders containing: + + - (a) 'training_configuration.yaml' (training configuration file), + - (b) 'models/model_best.pt' (checkpoint of the best saved model). - - Recursively creates the paths to the experiments folders, verifying that they are valid (e.g. \ - contain `training_statistics.csv`, `validation_statistics.csv` and `testing_statistics.csv`). """ - super(GridAnalyzer, self).setup_grid_experiment() + # Parse arguments. + self.flags, self.unparsed = self.parser.parse_known_args() + + # Set logger depending on the settings. + self.logger.setLevel(getattr(logging, self.flags.log_level.upper(), None)) # Check if experiments directory was indicated. - if self.flags.outdir == '': - print('Please pass the experiments directory as --outdir') + if self.flags.expdir == '': + print('Please pass the experiments directory as --expdir') exit(-1) - self.directory_chckpnts = self.flags.outdir + # Get experiment directory. + self.experiment_rootdir = self.flags.expdir - # get all sub-directories paths in outdir + # Get all sub-directories paths in expdir. self.experiments_list = [] - for root, dirs, files in os.walk(self.directory_chckpnts, topdown=True): + for root, dirs, _ in os.walk(self.experiment_rootdir, topdown=True): for name in dirs: self.experiments_list.append(os.path.join(root, name)) - # Keep only the folders that contain validation.csv and training.csv - self.experiments_list = [elem for elem in self.experiments_list if os.path.isfile( - elem + '/validation_statistics.csv') and os.path.isfile(elem + '/training_statistics.csv')] + # Keep only the folders that contain training_configuration.yaml, training_statistics.csv and + # training.csv and model (which contains aggregated validation statistics). + self.experiments_list = [elem for elem in self.experiments_list if + self.check_if_file_exists(elem, 'training_configuration.yaml') and + self.check_if_file_exists(elem, 'models/model_best.pt')] + + # Check if there are some valid folders. + if len(self.experiments_list) == 0: + self.logger.error("There are no valid experiment folders in {} directory!".format(self.experiment_rootdir)) + exit(-2) + + # List folders with "valid" experiment data. + exp_str = "Found the following valid experiments in directory: {} \n".format(self.experiment_rootdir) + exp_str += '='*80 + '\n' + for exp in self.experiments_list: + exp_str += " - {}\n".format(exp) + exp_str += '='*80 + '\n' + self.logger.info(exp_str) + + # Ask for confirmation - optional. + if self.flags.user_confirm: + try: + input('Press to confirm and start the grid analyzis\n') + except KeyboardInterrupt: + exit(0) - # check if the files are empty except for the first line - self.experiments_list = [elem for elem in self.experiments_list if os.stat( - elem + '/validation_statistics.csv').st_size > 24 and os.stat(elem + '/training_statistics.csv').st_size > 24] + def run_experiment(self, experiment_path: str): + """ + Collects the training / validation / test statistics for a given experiment path. - # the following is to detect how many tests runs have been done for each experiment, - # and asserting that the number is the same for all experiment - number_of_test = [] - for path in self.experiments_list: - experiments_tests = [] - for root, dirs, files in os.walk(path, topdown=True): - for name in dirs: - experiments_tests.append(os.path.join(root, name)) + Analyzes whether the given training experiment folder contains subfolders with test experiments data: - # Keep only the folders that contain `testing.csv` - experiments_tests = [elem for elem in experiments_tests if os.path.isfile(elem + '/testing_statistics.csv')] + - Loads and parses training configuration file, + - Loads checkpoint with model and training and validation statistics, + - Recursively traverses subdirectories looking for test experiments, - # check if that the `testing.csv` files are not empty - experiments_tests = [elem for elem in experiments_tests if os.stat(elem + '/testing_statistics.csv').st_size > 24] - number_of_test.append(len(experiments_tests)) + .. note:: - assert len(set(number_of_test)) == 1, 'Not all experiments have the same number of tests' - self.nb_tests = number_of_test[0] - self.logger.info('Detected a number of tests per experiment of {}.'.format(self.nb_tests)) + We require that the test statistics csv files are valid, i.e. contain at least one line with \ + collected statistics (excluding the header). - def run_experiment(self, experiment_path: str): - """ - Analyzes test results. - TODO: complete doc + - Collects statistics from training, validation (from model checkpoint) and test experiments \ + (from test csv files found in subdirectories). - :param experiment_path: Path to an experiment folder containing a trained model. + :param experiment_path: Path to an experiment folder containing a training statistics. :type experiment_path: str + :return: Four dictionaries containing: + - Status info (model, problem etc.), + - Training statistics, + - Validation statistics, + - Test statistics. - ..note:: - Visualization is deactivated to avoid any user interaction. + """ + self.logger.info('Analyzing experiments from: {}'.format(experiment_path)) - TODO: anything else? + # Create dictionaries. + status_dict = dict() + train_dict = dict() + valid_dict = dict() + # Load yaml file, to get model name, problem name and random seeds. + with open(os.path.join(experiment_path, 'training_configuration.yaml'), 'r') as yaml_file: + params = yaml.load(yaml_file) - """ - r = dict() # results dictionary + # Get problem and model names - from config. + status_dict['problem'] = params['testing']['problem']['name'] + status_dict['model'] = params['model']['name'] - r['timestamp'] = os.path.basename(os.path.normpath(experiment_path)) + # Load checkpoint from model file. + chkpt = torch.load(os.path.join(experiment_path, 'models/model_best.pt'), + map_location=lambda storage, loc: storage) - # Load yaml file. To get model name, problem name and random seeds - with open(experiment_path + '/training_configuration.yaml', 'r') as yaml_file: - params = yaml.load(yaml_file) + status_dict['model_save_timestamp'] = '{0:%Y%m%d_%H%M%S}'.format(chkpt['model_timestamp']) + status_dict['training_terminal_status'] = chkpt['status'] + status_dict['training_terminal_status_timestamp'] = '{0:%Y%m%d_%H%M%S}'.format(chkpt['status_timestamp']) - r['model'] = params['model']['name'] - r['problem'] = params['testing']['problem']['name'] - r['seed_torch'] = params['training']['seed_torch'] - r['seed_numpy'] = params['training']['seed_numpy'] + # Create "empty" equivalent. + status_dict_empty = dict.fromkeys(status_dict.keys(), ' ') - # get all sub-directories paths in experiment_path: to detect test experiments paths - experiments_tests = [] + # Copy training status stats. + train_dict['training_configuration_filepath'] = os.path.join(experiment_path, 'training_configuration.yaml') + train_dict['training_start_timestamp'] = os.path.basename(os.path.normpath(experiment_path)) + train_dict['training_seed_torch'] = params['training']['seed_torch'] + train_dict['training_seed_numpy'] = params['training']['seed_numpy'] - for root, dirs, files in os.walk(experiment_path, topdown=True): - for name in dirs: - experiments_tests.append(os.path.join(root, name)) + # Copy the training statistics from the checkpoint and add the 'train_' prefix. + for key, value in chkpt['training_stats'].items(): + train_dict['training_{}'.format(key)] = value + # Create "empty" equivalent. + train_dict_empty = dict.fromkeys(train_dict.keys(), ' ') - # Keep only the folders that contain `testing.csv` - experiments_tests = [elem for elem in experiments_tests if os.path.isfile(elem + '/testing_statistics.csv')] + # Copy the validation statistics from the checkpoint and add the 'valid_' prefix. + for key, value in chkpt['validation_stats'].items(): + valid_dict['validation_{}'.format(key)] = value + # Create "empty" equivalent. + valid_dict_empty = dict.fromkeys(valid_dict.keys(), ' ') - # check if that the `testing.csv` files are not empty - experiments_tests = [elem for elem in experiments_tests if os.stat(elem + '/testing_statistics.csv').st_size > 24] + # Get all tests for a given training experiment. + experiments_tests = self.get_experiment_tests(experiment_path) - with open(os.path.join(experiment_path, '/validation_statistics.csv'), mode='r') as f: - valid_csv = csv.reader(f, delimiter=',') + list_test_dicts = [] - with open(os.path.join(experiment_path, '/training_statistics.csv'), mode='r') as f: - train_csv = csv.reader(f, delimiter=',') + if len(experiments_tests) > 0: + self.logger.info(' - Found {} test(s)'.format(len(experiments_tests))) - # get best train point - train_episode = train_csv.episode.values.astype(int) - train_loss = train_csv.loss.values.astype(float) + # "Expand" status, train and valid dicts by empty ones, prop. to the number of test folders. + list_status_dicts = [status_dict, *[status_dict_empty for _ in range(len(experiments_tests) - 1)]] + list_train_dicts = [train_dict, *[train_dict_empty for _ in range(len(experiments_tests) - 1)]] + list_valid_dicts = [valid_dict, *[valid_dict_empty for _ in range(len(experiments_tests) - 1)]] - index_train_loss = np.argmin(train_loss) - r['best_train_ep'] = train_episode[index_train_loss] # episode index of lowest training loss - r['training_episodes_limit'] = train_episode[-1] - r['best_train_loss'] = train_loss[index_train_loss] # lowest training loss + # Get tests statistics. + for experiment_test_path in experiments_tests: + self.logger.info(' - Analyzing test from: {}'.format(experiment_test_path)) - if 'acc' in train_csv: - train_accuracy = train_csv.acc.values.astype(float) - r['best_train_acc'] = train_accuracy[index_train_loss] + # Create test dict: + test_dict = dict() + test_dict['test_configuration_filepath'] = os.path.join(experiment_test_path, 'testing_set_agg_statistics.yaml') + test_dict['test_start_timestamp'] = os.path.basename(os.path.normpath(experiment_test_path))[5:] - # best valid point - valid_episode = valid_csv.episode.values.astype(int) - valid_loss = valid_csv.loss.values.astype(float) + # Load yaml file and get random seeds. + with open(os.path.join(experiment_test_path, 'testing_configuration.yaml'), 'r') as yaml_file: + test_params = yaml.load(yaml_file) + # Get seeds. + test_dict['test_seed_torch'] = test_params['testing']['seed_torch'] + test_dict['test_seed_numpy'] = test_params['testing']['seed_numpy'] - index_val_loss = np.argmin(valid_loss) - r['best_valid_ep'] = valid_episode[index_val_loss] # episode index of lowest validation loss - r['best_valid_loss'] = valid_loss[index_val_loss] # lowest validation loss + # Load csv file and copy test statistics + with open(os.path.join(experiment_test_path, 'testing_set_agg_statistics.csv'), mode='r') as f: + # Open file. + test_reader = csv.DictReader(f) - if 'acc' in valid_csv: - valid_accuracy = valid_csv.acc.values.astype(float) - r['best_valid_accuracy'] = valid_accuracy[index_val_loss] + # Copy training statistics. + for row in test_reader: + for key, value in row.items(): + test_dict['test_{}'.format(key)] = value - # get test statistics - for test_idx, experiment in zip(range(1, self.nb_tests+1), experiments_tests): + list_test_dicts.append(test_dict) - with open(os.path.join(experiment_path, '/testing_statistics.csv'), mode='r') as f: - test_csv = csv.reader(f, delimiter=',') + else: + self.logger.info(' - Could not find any valid tests') + list_status_dicts = [status_dict] + list_train_dicts = [train_dict] + list_valid_dicts = [valid_dict] - # get average test loss - nb_episode = test_csv.episode.values.astype(int)[-1]+1 - losses = test_csv.loss.values.astype(float) + # Add "empty test entry" + list_test_dicts.append({}) - r['test_{}_average_loss'.format(test_idx)] = sum(losses)/nb_episode - r['test_{}_std_loss'.format(test_idx)] = np.std(losses) + # Return all dictionaries with lists + return list_status_dicts, list_train_dicts, list_valid_dicts, list_test_dicts - if 'acc' in test_csv: - accuracies = test_csv.acc.values.astype(float) - r['test_{}_average_acc'.format(test_idx)] = sum(accuracies) / nb_episode - r['test_{}_std_acc'.format(test_idx)] = np.std(accuracies) + @staticmethod + def merge_list_dicts(list_dicts): + """ + Merges a list of dictionaries by filling the missing fields with spaces into one dict. + + :param list_dicts: List of dictionaries, potentially containing different headers, which will be merged. + :type list_dicts: list - return r + :return: dict, resulting of the merge. - def run_grid_experiment(self): """ - Constructor for the ``GridAnalyzer``. + # Create a "unified" header. + header = set(k for d in list_dicts for k in d) + + # Create an "empty" dict from the unified header. + empty_dict = {k: ' ' for k in header} - Maps the grid analysis to CPU cores in the limit of the available cores. + # "Fill" all lists with empty gaps. + list_filled_dicts = [] + for i, _ in enumerate(list_dicts): + list_filled_dicts.append({**empty_dict, **(list_dicts[i])}) + # Zip lists of dicts. + final_dict = dict(zip(header, zip(*[d.values() for d in list_filled_dicts]))) + # Return the result. + return final_dict + + def run_grid_experiment(self): """ - # Run in as many threads as there are CPUs available to the script - with ThreadPool(processes=len(os.sched_getaffinity(0))) as pool: - func = partial(GridAnalyzer.run_experiment, self) - list_dict_exp = pool.map(func, self.experiments_list) + Collects four list of dicts from each experiment path contained in ``self.experiments_lists``. - exp_values = dict(zip(list_dict_exp[0], zip(*[d.values() for d in list_dict_exp]))) + Merges all them together and saves result to a single csv file. + + """ + try: + # Go through the experiments one by one and collect data. + list_statuses = [] + list_trains = [] + list_valids = [] + list_tests = [] + + for exp in self.experiments_list: + statuses, trains, valids, tests = self.run_experiment(exp) + list_statuses.extend(statuses) + list_trains.extend(trains) + list_valids.extend(valids) + list_tests.extend(tests) + + # Merge lists. + statuses = self.merge_list_dicts(list_statuses) + trains = self.merge_list_dicts(list_trains) + valids = self.merge_list_dicts(list_valids) + tests = self.merge_list_dicts(list_tests) + + # Merge everything into one big dictionary.. + exp_values = {**statuses, **trains, **valids, **tests} # create results file - results_file = os.path.join(self.directory_chckpnts, "{0:%Y%m%d_%H%M%S}_grid_analysis.csv".format(datetime.now())) + results_file = os.path.join(self.experiment_rootdir, + "{0:%Y%m%d_%H%M%S}_grid_analysis.csv".format(datetime.now())) with open(results_file, "w") as outfile: writer = csv.writer(outfile, delimiter=',') writer.writerow(exp_values.keys()) writer.writerows(zip(*exp_values.values())) - self.logger.info('Analysis done.') - self.logger.info('Results stored in {}.'.format(results_file)) + self.logger.info('Analysis finished') + self.logger.info('Results stored in {}.'.format(results_file)) + + except KeyboardInterrupt: + self.logger.info('Grid analysis interrupted!') def main(): """ - Entry point function for the ``GridAnalyzer``. + Entry point function for the :py:class:`miprometheus.grid_workers.GridAnalyzer`. """ grid_analyzer = GridAnalyzer() diff --git a/miprometheus/grid_workers/grid_tester_cpu.py b/miprometheus/grid_workers/grid_tester_cpu.py index a3e81d45..4292cab7 100644 --- a/miprometheus/grid_workers/grid_tester_cpu.py +++ b/miprometheus/grid_workers/grid_tester_cpu.py @@ -18,10 +18,10 @@ """ grid_tester_cpu.py: - - This file contains the implementation of a worker running the ``Tester`` on the results of a ``GridTrainer`` - using CPUs. + - This file contains the implementation of a worker running the :py:class:`miprometheus.workers.Tester` \ + on the results of a ``GridTrainer`` using CPUs. - - The input is a list of directories for each problem/model e.g. `experiments/serial_recall/dnc`, \ + - The main input is a list of directories for each problem/model e.g. `experiments/serial_recall/dnc`, \ and executes on every run of the model in that directory. """ @@ -40,16 +40,16 @@ class GridTesterCPU(GridWorker): """ Implementation of the Grid Tester running on CPUs. - Reuses the ``Tester`` to start one test experiment. + Reuses the :py:class:`miprometheus.workers.Tester` to start one test experiment. """ def __init__(self, name="GridTesterCPU", use_gpu=False): """ - Constructor for the ``GridTesterCPU``: + Constructor for the :py:class:`miprometheus.grid_workers.GridTesterCPU`: - Calls the base constructor to set the worker's name and add default command lines arguments, - - Adds some ``GridTrainer`` specific command line arguments. + - Adds some ``GridTester`` specific command line arguments. :param name: Name of the worker (DEFAULT: "GridTesterCPU"). :type name: str @@ -62,100 +62,111 @@ def __init__(self, name="GridTesterCPU", use_gpu=False): super(GridTesterCPU, self).__init__(name=name,use_gpu=use_gpu) # Get number_of_repetitions - self.parser.add_argument('--r', + self.parser.add_argument('--repeat', dest='experiment_repetitions', type=int, default=1, - help='Number of experiment repetitions to run for each model.' - ' (DEFAULT=1)') + help='Number of experiment repetitions to run for each model (DEFAULT=1).') # Get number_of_repetitions - self.parser.add_argument('--m', + self.parser.add_argument('--max_concur_runs', dest='max_concurrent_runs', type=int, default=-1, - help='Value limiting the number of concurently running experiments.' - 'The set limit will be truncated by number of available CPUs/GPUs.' - ' (DEFAULT=-1, meaning that it will be set to the number of CPUs/GPUs)') - + help='Value limiting the number of concurrently running experiments.' + 'The set limit will be truncated by number of available CPUs/GPUs.' + ' (DEFAULT=-1, meaning that it will be set to the number of CPUs/GPUs)') def setup_grid_experiment(self): """ Setups the overall grid of experiments: - - Calls the ``super(self).setup_experiment()`` to parse arguments, - + - Calls :py:func:`GridWorker.setup_grid_experiment()` to parse arguments, - Recursively creates the paths to the experiments folders, verifying that they are valid (e.g. \ - contain `validation_statistics.csv` and `training_statistics.csv`). - - - :param cuda: Whether to use cuda or not. Default to ``False``. - :type cuda: bool + they contain a saved model, `model_best.pt`). """ super(GridTesterCPU, self).setup_grid_experiment() # Check the presence of mip-tester script. if shutil.which('mip-tester') is None: - self.logger.error("Cannot localize the 'mip-tester' script! (hints: please use setup.py to install it)") + self.logger.error("Cannot localize the 'mip-tester' script! (hint: please use setup.py to install it)") exit(-1) - directory_chckpnts = self.flags.outdir + self.experiment_rootdir = self.flags.expdir + # Get grid settings. experiment_repetitions = self.flags.experiment_repetitions self.max_concurrent_runs = self.flags.max_concurrent_runs - # get all sub-directories paths in outdir, repeating according to flags.num + # get all sub-directories paths in expdir, repeating according to flags.experiment_repetitions self.experiments_list = [] for _ in range(experiment_repetitions): - for root, dirs, files in os.walk(directory_chckpnts, topdown=True): + for root, dirs, _ in os.walk(self.experiment_rootdir, topdown=True): for name in dirs: self.experiments_list.append(os.path.join(root, name)) - # Keep only the folders that contain validation.csv and training.csv - self.experiments_list = [elem for elem in self.experiments_list if os.path.isfile( - elem + '/validation_statistics.csv') and os.path.isfile(elem + '/training_statistics.csv')] + # Keep only the folders that contain best_model.pt in model subdirectory. + # We assume that training configuration is there as well. + self.experiments_list = [elem for elem in self.experiments_list + if os.path.isfile(elem + '/model_best.pt')] + + # Check if these are 'valid' folders, e.g. they contain a saved model + if len(self.experiments_list) == 0: + self.logger.error("There are no models in {} directory!".format(self.experiment_rootdir)) + exit(-2) - # check if the files are not empty - self.experiments_list = [elem for elem in self.experiments_list if os.stat( - elem + '/validation_statistics.csv').st_size > 24 and os.stat(elem + '/training_statistics.csv').st_size > 24] + # List folders. + exp_str = "Found the following models in {} directory:\n".format(self.experiment_rootdir) + exp_str += '='*80 + '\n' + for exp in self.experiments_list: + exp_str += " - {}/model_best.pt\n".format(exp) + exp_str += '='*80 + '\n' + self.logger.info(exp_str) self.logger.info('Number of experiments to run: {}'.format(len(self.experiments_list))) self.experiments_done = 0 + # Ask for confirmation - optional. + if self.flags.user_confirm: + try: + input('Press to confirm and start the grid of experiments\n') + except KeyboardInterrupt: + exit(0) + def run_grid_experiment(self): """ - Main function of the ``GridTesterCPU``. + Main function of the :py:class:`miprometheus.grid_workers.GridTesterCPU`. Maps the grid experiments to CPU cores in the limit of the maximum concurrent runs allowed or maximum\ available cores. """ - # Ask for confirmation - optional. - if self.flags.confirm: - input('Press any key to continue') + try: - # Check max number of child processes. - if self.max_concurrent_runs <= 0: # We need at least one proces! - max_processes = len(os.sched_getaffinity(0)) - else: - # Take into account the minimum value. - max_processes = min(len(os.sched_getaffinity(0)), self.max_concurrent_runs) - self.logger.info('Spanning experiments using {} CPU(s) concurrently.'.format(max_processes)) + # Check max number of child processes. + if self.max_concurrent_runs <= 0: # We need at least one process! + max_processes = self.get_available_cpus() + else: + # Take into account the minimum value. + max_processes = min(self.get_available_cpus(), self.max_concurrent_runs) + self.logger.info('Spanning experiments using {} CPU(s) concurrently'.format(max_processes)) - # Run in as many threads as there are CPUs available to the script. - with ThreadPool(processes=max_processes) as pool: - func = partial(GridTesterCPU.run_experiment, self, prefix="") - pool.map(func, self.experiments_list) + # Run in as many threads as there are CPUs available to the script. + with ThreadPool(processes=max_processes) as pool: + func = partial(GridTesterCPU.run_experiment, self, prefix="") + pool.map(func, self.experiments_list) - self.logger.info('Grid test experiments finished.') + self.logger.info('Grid testing finished') + except KeyboardInterrupt: + self.logger.info('Grid testing interrupted!') def run_experiment(self, experiment_path: str, prefix=""): """ - Runs a test on the specified model (experiment_path) using the ``Tester``. + Runs a test on the specified model (experiment_path) using the :py:class:`miprometheus.workers.Tester`. :param experiment_path: Path to an experiment folder containing a trained model. :type experiment_path: str @@ -167,21 +178,20 @@ def run_experiment(self, experiment_path: str, prefix=""): - Visualization is deactivated to avoid any user interaction. - Command-line arguments such as the logging interval (``--li``) and log level (``--ll``) are passed \ - to the used ``Trainer``. + to the :py:class:`miprometheus.workers.Tester`. """ - path_to_model = os.path.join(experiment_path, 'models/model_best.pt') - - # check if models list is empty - if not os.path.isfile(path_to_model): - self.logger.warning('The indicated model {} does not exist on file.'.format(path_to_model)) + try: - else: + path_to_model = os.path.join(experiment_path, 'model_best.pt') + self.logger.warning(path_to_model) # Run the test - command_str = "{}mip-tester --model {} --li {} --ll {}".format(prefix, path_to_model, - self.flags.logging_interval, - self.flags.log_level) + command_str = "{}mip-tester --model {} --li {} --ll {}".format( + prefix, path_to_model, + self.flags.logging_interval, + self.flags.log_level) + # Add gpu flag if required. if self.app_state.use_CUDA: command_str += " --gpu " @@ -191,17 +201,20 @@ def run_experiment(self, experiment_path: str, prefix=""): result = subprocess.run(command_str.split(" "), stdout=devnull) self.experiments_done += 1 self.logger.info("Finished: {}".format(command_str)) - print() + self.logger.info( 'Number of experiments done: {}/{}.'.format(self.experiments_done, len(self.experiments_list))) if result.returncode != 0: self.logger.info("Testing exited with code: {}".format(result.returncode)) + except KeyboardInterrupt: + self.logger.info('Grid testing interrupted!') + def main(): """ - Entry point function for the ``GridTesterCPU``. + Entry point function for the :py:class:`miprometheus.grid_workers.GridTesterCPU`. """ grid_tester_cpu = GridTesterCPU() diff --git a/miprometheus/grid_workers/grid_tester_gpu.py b/miprometheus/grid_workers/grid_tester_gpu.py index fab9ae8a..7751266a 100644 --- a/miprometheus/grid_workers/grid_tester_gpu.py +++ b/miprometheus/grid_workers/grid_tester_gpu.py @@ -18,8 +18,8 @@ """ grid_tester_gpu.py: - - This file contains the implementation of a worker running the ``Tester`` on the results of a ``GridTrainer``\ - using CPUs. + - This file contains the implementation of a worker running the :py:class:`miprometheus.workers.Tester` \ + on the results of a ``GridTrainer`` using GPUs. - The input is a list of directories for each problem/model e.g. `experiments/serial_recall/dnc`, \ and executes on every run of the model in that directory. @@ -41,17 +41,17 @@ class GridTesterGPU(GridTesterCPU): """ Implementation of the ``GridTester`` running on GPUs. - Reuses the ``Tester`` to start one test experiment. + Reuses the :py:class:`miprometheus.workers.Tester` to start one test experiment. - Inherits from ``GridTesterCPU`` as the constructor is identical. + Inherits from :py:class:`miprometheus.grid_workers.GridTesterCPU` as the constructor is identical. """ def __init__(self, name="GridTesterGPU", use_gpu=True): """ - Constructor for the ``GridTesterGPU``: + Constructor for the :py:class:`miprometheus.grid_workers.GridTesterGPU`: - - Calls the constructor of ``GridTesterCPU`` as it is identical. + - Calls the constructor of :py:class:`miprometheus.grid_workers.GridTesterCPU` as it is identical. :param name: Name of the worker (DEFAULT: "GridTesterGPU"). @@ -68,69 +68,70 @@ def setup_grid_experiment(self): """ Setups a specific experiment. - - Calls the ``super(self).setup_experiment()`` to parse arguments, parse config files etc. + - Calls :py:func:`GridTesterCPU.setup_grid_experiment()` to parse arguments, parse config files etc. - Checks the presence of CUDA-compatible devices. """ super(GridTesterGPU, self).setup_grid_experiment() + # Check the presence of the CUDA-compatible devices. - if (torch.cuda.device_count() == 0): + if torch.cuda.device_count() == 0: self.logger.error("Cannot use GPU as there are no CUDA-compatible devices present in the system!") exit(-1) - def run_grid_experiment(self): """ - Main function of the ``GridTesterGPU``. + Main function of the :py:class:`miprometheus.grid_workers.GridTesterGPU`. - Maps the grid experiments to CPU cores in the limit of the maximum concurrent runs allowed or maximum\ - available cores. + Maps the grid experiments to CUDA device in the limit of the maximum concurrent runs allowed or maximum \ + available devices. """ - # Ask for confirmation - optional. - if self.flags.confirm: - input('Press any key to continue') - - # Check the presence of cuda-gpupick - if shutil.which('cuda-gpupick') is not None: - prefix_str = "cuda-gpupick -n1 " - else: - self.logger.warning("Cannot localize the 'cuda-gpupick' script, disabling it") - prefix_str = '' - - # Check max number of child processes. - if self.max_concurrent_runs <= 0: # We need at least one proces! - max_processes = torch.cuda.device_count() - else: - # Take into account the minimum value. - max_processes = min(torch.cuda.device_count(), self.max_concurrent_runs) - self.logger.info('Spanning experiments using {} GPU(s) concurrently.'.format(max_processes)) - - # Run in as many threads as there are GPUs available to the script. - with ThreadPool(processes=max_processes) as pool: - # This contains a list of `AsyncResult` objects. To check if completed and get result. - thread_results = [] - - for task in self.experiments_list: - func = partial(GridTesterGPU.run_experiment, self, prefix=prefix_str) - thread_results.append(pool.apply_async(func, (task,))) - - # Check every 3 seconds if there is a (supposedly) free GPU to start a task on - sleep(3) - while [r.ready() for r in thread_results].count(False) >= torch.cuda.device_count(): + try: + + # Check the presence of cuda-gpupick + if shutil.which('cuda-gpupick') is not None: + prefix_str = "cuda-gpupick -n1 " + else: + self.logger.warning("Cannot localize the 'cuda-gpupick' script, disabling it") + prefix_str = '' + + # Check max number of child processes. + if self.max_concurrent_runs <= 0: # We need at least one process! + max_processes = torch.cuda.device_count() + else: + # Take into account the minimum value. + max_processes = min(torch.cuda.device_count(), self.max_concurrent_runs) + self.logger.info('Spanning experiments using {} GPU(s) concurrently'.format(max_processes)) + + # Run in as many threads as there are GPUs available to the script. + with ThreadPool(processes=max_processes) as pool: + # This contains a list of `AsyncResult` objects. To check if completed and get result. + thread_results = [] + + for task in self.experiments_list: + func = partial(GridTesterGPU.run_experiment, self, prefix=prefix_str) + thread_results.append(pool.apply_async(func, (task,))) + + # Check every 3 seconds if there is a (supposedly) free GPU to start a task on sleep(3) + while [r.ready() for r in thread_results].count(False) >= torch.cuda.device_count(): + sleep(3) + + # Equivalent of what would usually be called "join" for threads + for r in thread_results: + r.wait() - # Equivalent of what would usually be called "join" for threads - for r in thread_results: - r.wait() + self.logger.info('Grid testing finished') - self.logger.info('Grid training experiments finished.') + except KeyboardInterrupt: + self.logger.info('Grid testing interrupted!') def main(): """ - Entry point function for the ``GridTesterGPU``. + Entry point function for the :py:class:`miprometheus.grid_workers.GridTesterGPU`. """ grid_tester_gpu = GridTesterGPU() diff --git a/miprometheus/grid_workers/grid_trainer_cpu.py b/miprometheus/grid_workers/grid_trainer_cpu.py index 1b5f0c96..ae14c753 100644 --- a/miprometheus/grid_workers/grid_trainer_cpu.py +++ b/miprometheus/grid_workers/grid_trainer_cpu.py @@ -19,9 +19,10 @@ grid_trainer_cpu.py: - This file contains the implementation of a worker spanning a grid of training experiments on \ - a collection of CPUs. It works by loading a template yaml file, modifying the resulting dict, and dumping \ - that as yaml into a temporary file. The specified ``Trainer`` is then executed using the temporary yaml \ - file as the task. This grid trainer will run as many concurrent jobs as possible. + a collection of CPUs. + - It works by loading a template yaml file, modifying the resulting dict, and dumping \ + that as yaml into a temporary file. The specified :py:class:`miprometheus.workers.Trainer` is then \ + executed using the temporary yaml file as the task. This grid trainer will run as many concurrent jobs as possible. """ __author__ = "Alexis Asseman, Ryan McAvoy, Tomasz Kornuta, Vincent Marois" @@ -43,13 +44,14 @@ class GridTrainerCPU(GridWorker): """ Grid Worker managing several training experiments on CPUs. - Reuses a ``Trainer`` (can specify the ``classic`` one or the ``flexible`` one) to start one experiment. + Reuses a :py:class:`miprometheus.workers.Trainer` (can specify :py:class:`miprometheus.workers.OfflineTrainer` \ + or :py:class:`miprometheus.workers.OnlineTrainer`) to start one experiment. """ def __init__(self, name="GridTrainerCPU", use_gpu=False): """ - Constructor for the ``GridTrainerCPU``: + Constructor for the :py:class:`miprometheus.grid_workers.GridTrainerCPU`: - Calls the base constructor to set the worker's name and add default command lines arguments, - Adds some ``GridTrainer`` specific command line arguments. @@ -70,34 +72,43 @@ def __init__(self, name="GridTrainerCPU", use_gpu=False): type=str, default='', help='Name of the configuration file(s) to be loaded. ' - 'If specifying more than one file, they must be separated with coma ",".') + 'If specifying more than one file, they must be separated with coma ","') + + self.parser.add_argument('--trainer', + dest='trainer', + type=str, + default='', + help='Indicate which Trainer will be used (DEFAULT: '' => mip-offline-trainer).') + + + self.parser.add_argument('--savetag', + dest='savetag', + type=str, + default='', + help='Additional tag for the (output) experiment directory.') - self.parser.add_argument('--online_trainer', - dest='online_trainer', - action='store_true', - help='Select the OnLineTrainer instead of the default OffLineTrainer.') self.parser.add_argument('--tensorboard', action='store', dest='tensorboard', choices=[0, 1, 2], type=int, help="If present, enable logging to TensorBoard. Available log levels:\n" - "0: Log the collected statistics.\n" - "1: Add the histograms of the model's biases & weights (Warning: Slow).\n" + "0: Log the collected statistics\n" + "1: Add the histograms of the model's biases & weights (Warning: Slow)\n" "2: Add the histograms of the model's biases & weights gradients " - "(Warning: Even slower).") + "(Warning: Even slower)") def setup_grid_experiment(self): """ Setups a specific experiment. - - Calls the ``super(self).setup_experiment()`` to parse arguments, sets the 3 default sections \ - (training / validation / test) and sets their dataloaders params. + - Calls :py:func:`GridWorker.setup_grid_experiment()` to parse arguments, sets the 3 default sections \ + (training / validation / test) and sets their :py:class:`torch.utils.data.DataLoader` params. - Verifies that the specified config file is valid, - Parses it and recursively creates the configurations files for the grid tasks, overwriting \ - specific sections if indicated (`grid_overwrite` and/or `overwrite` (task specific), + specific sections if indicated: `grid_overwrite` and/or `overwrite` (task specific), - Creates the output dir. @@ -107,12 +118,12 @@ def setup_grid_experiment(self): # Check if config file was selected. if self.flags.config == '': - print('Please pass grid configuration file as --c parameter.') + print('Please pass grid configuration file as --c parameter') exit(-1) # Check if file exists. if not os.path.isfile(self.flags.config): - print('Error: Grid configuration file {} does not exist.'.format(self.flags.config)) + print('Error: Grid configuration file {} does not exist'.format(self.flags.config)) exit(-2) try: # open file and get parameter dictionary. @@ -120,27 +131,39 @@ def setup_grid_experiment(self): grid_dict = yaml.safe_load(stream) except yaml.YAMLError as e: - print("Error: Could not properly parse the {} grid configuration file.".format(self.flags.config)) - print('yaml.YAMLERROR:', e) + print("Error: Could not properly parse the {} grid configuration file".format(self.flags.config)) + print('yaml.YAMLERROR: ', e) exit(-3) - # Check the presence of mip-*-trainer scripts. - if self.flags.online_trainer: - if shutil.which('mip-online-trainer') is None: - self.logger.error("Cannot localize the 'mip-online-trainer' script! (hints: please use setup.py to install it)") - exit(-4) + # Set trainer. + if self.flags.trainer != '': + self.trainer = self.flags.trainer else: - if shutil.which('mip-offline-trainer') is None: - self.logger.error("Cannot localize the 'mip-offline-trainer' script! (hints: please use setup.py to install it)") - exit(-4) + # Try to read from config. + try: + self.trainer = grid_dict['grid_settings']['trainer'] + except KeyError: + # Set offline trainer as default. + self.trainer = 'mip-offline-trainer' + + # Check it user indicated a valid trainer. + if self.trainer not in ['mip-offline-trainer', 'mip-online-trainer']: + self.logger.error("Indicated '{}' does not exists!".format(self.trainer)) + exit(-4) + + # Check the presence of mip-*-trainer scripts. + if shutil.which(self.trainer) is None: + self.logger.error("Cannot localize the '{}}' script! " + "(hint: please use setup.py to install it)".format(self.trainer)) + exit(-5) # Get grid settings. try: experiment_repetitions = grid_dict['grid_settings']['experiment_repetitions'] self.max_concurrent_runs = grid_dict['grid_settings']['max_concurrent_runs'] except KeyError: - print("Error: The 'grid_settings' section must define 'experiment_repetitions' and 'max_concurrent_runs'.") - exit(-5) + print("Error: The 'grid_settings' section must define 'experiment_repetitions' and 'max_concurrent_runs'") + exit(-6) # Check the presence of grid_overwrite section. if 'grid_overwrite' not in grid_dict: @@ -153,8 +176,8 @@ def setup_grid_experiment(self): # Check the presence of the tasks section. if 'grid_tasks' not in grid_dict: - print("Error: Grid configuration is lacking the 'grid_tasks' section.") - exit(-6) + print("Error: Grid configuration is lacking the 'grid_tasks' section") + exit(-7) # Create temporary file param_interface_file = NamedTemporaryFile(mode='w', delete=False) @@ -187,7 +210,7 @@ def setup_grid_experiment(self): except KeyError: pass - # at this point, configs should contains the str of config file(s) corresponding to the grid_tasks. + # at this point, configs should contain the str of config file(s) corresponding to the grid_tasks. # Create list of experiments, repeat the ones that are required. self.experiments_list = [] @@ -198,49 +221,56 @@ def setup_grid_experiment(self): self.experiments_done = 0 # create experiment directory label of the day - self.outdir_str = self.flags.outdir + '_{0:%Y%m%d_%H%M%S}'.format(datetime.now()) + self.expdir_str = self.flags.expdir + '_{0:%Y%m%d_%H%M%S}'.format(datetime.now()) # add savetag if self.flags.savetag != '': - self.outdir_str = self.outdir_str + "_" + self.flags.savetag + '/' + self.expdir_str = self.expdir_str + "_" + self.flags.savetag + '/' + self.logger.info('Setting experiment directory to: {}'.format(self.expdir_str)) # Prepare output paths for logging while True: # Dirty fix: if log_dir already exists, wait for 1 second and try again try: - os.makedirs(self.outdir_str, exist_ok=False) + os.makedirs(self.expdir_str, exist_ok=False) except FileExistsError: sleep(1) else: break + # Ask for confirmation - optional. + if self.flags.user_confirm: + try: + input('Press to confirm and start the grid of experiments\n') + except KeyboardInterrupt: + exit(0) def run_grid_experiment(self): """ - Main function of the ``GridTrainerCPU``. + Main function of the :py:class:`miprometheus.grid_workers.GridTrainerCPU`. - Maps the grid experiments to CPU cores in the limit of the maximum concurrent runs allowed or maximum\ - available cores. + Maps the grid experiments to CPU cores in the limit of the maximum concurrent runs allowed or maximum \ + available cores. """ - # Ask for confirmation - optional. - if self.flags.confirm: - input('Press any key to continue') + try: - # Check max number of child processes. - if self.max_concurrent_runs <= 0: # We need at least one proces! - max_processes = len(os.sched_getaffinity(0)) - else: - # Take into account the minimum value. - max_processes = min(len(os.sched_getaffinity(0)), self.max_concurrent_runs) - self.logger.info('Spanning experiments using {} CPU(s) concurrently.'.format(max_processes)) + # Check max number of child processes. + if self.max_concurrent_runs <= 0: # We need at least one process! + max_processes = self.get_available_cpus() + else: + # Take into account the minimum value. + max_processes = min(self.get_available_cpus(), self.max_concurrent_runs) + self.logger.info('Spanning experiments using {} CPU(s) concurrently'.format(max_processes)) - # Run in as many threads as there are CPUs available to the script. - with ThreadPool(processes=max_processes) as pool: - func = partial(GridTrainerCPU.run_experiment, self, prefix="") - pool.map(func, self.experiments_list) + # Run in as many threads as there are CPUs available to the script. + with ThreadPool(processes=max_processes) as pool: + func = partial(GridTrainerCPU.run_experiment, self, prefix="") + pool.map(func, self.experiments_list) - self.logger.info('Grid training experiments finished.') + self.logger.info('Grid training finished') + except KeyboardInterrupt: + self.logger.info('Grid training interrupted!') def run_experiment(self, experiment_configs: str, prefix=""): """ @@ -254,49 +284,52 @@ def run_experiment(self, experiment_configs: str, prefix=""): :type prefix: str - ..note:: + .. note:: - - Not using the ``--model`` argument of the ``Trainer`` to load a pretrained model. - Visualization is deactivated to avoid any user interaction. - Command-line arguments such as the logging interval (``--li``), tensorboard (``--t``) and log level \ - (``--ll``) are passed to the used ``Trainer``. + (``--ll``) are passed to the used :py:class:`miprometheus.workers.Trainer` + - Not using the `--model` command-line argument of the :py:class:`miprometheus.workers.Trainer` \ + to load a pretrained model. Please use instead the configuration parameter `load` in the `model` section. """ - # set the command to be executed using the indicated Trainer - if self.flags.online_trainer: - command_str = "{}mip-online-trainer".format(prefix) - else: - command_str = "{}mip-offline-trainer".format(prefix) + try: + + # Set the command to be executed using the indicated trainer and prefix. + command_str = "{}{}".format(prefix,self.trainer) + + # Add gpu flag if required. + if self.app_state.use_CUDA: + command_str += " --gpu " - # Add gpu flag if required. - if self.app_state.use_CUDA: - command_str += " --gpu " + # Add experiment config(s). + command_str = command_str + " --c {0} --expdir " + self.expdir_str + ' --li ' + str(self.flags.logging_interval) \ + + ' --ll ' + str(self.flags.log_level) + command_str = command_str.format(experiment_configs) - # Add experiment config(s). - command_str = command_str + " --c {0} --outdir " + self.outdir_str + ' --li ' + str(self.flags.logging_interval) \ - + ' --ll ' + str(self.flags.log_level) - command_str = command_str.format(experiment_configs) + # Add tensorboard flag. + if self.flags.tensorboard is not None: + command_str += " --t " + str(self.flags.tensorboard) - # Add tensorboard flag. - if self.flags.tensorboard is not None: - command_str += " --t " + str(self.flags.tensorboard) + self.logger.info("Starting: {}".format(command_str)) + with open(os.devnull, 'w') as devnull: + result = subprocess.run(command_str.split(" "), stdout=devnull) + self.experiments_done += 1 + self.logger.info("Finished: {}".format(command_str)) - self.logger.info("Starting: {}".format(command_str)) - with open(os.devnull, 'w') as devnull: - result = subprocess.run(command_str.split(" "), stdout=devnull) - self.experiments_done += 1 - self.logger.info("Finished: {}".format(command_str)) + self.logger.info('Number of experiments done: {}/{}.'.format(self.experiments_done, len(self.experiments_list))) - self.logger.info('Number of experiments done: {}/{}.'.format(self.experiments_done, len(self.experiments_list))) + if result.returncode != 0: + self.logger.info("Training exited with code: {}".format(result.returncode)) - if result.returncode != 0: - self.logger.info("Training exited with code: {}".format(result.returncode)) + except KeyboardInterrupt: + self.logger.info('Grid training interrupted!') def main(): """ - Entry point function for the ``GridTrainerCPU``. + Entry point function for the :py:class:`miprometheus.grid_workers.GridTrainerCPU`. """ grid_trainer_cpu = GridTrainerCPU() diff --git a/miprometheus/grid_workers/grid_trainer_gpu.py b/miprometheus/grid_workers/grid_trainer_gpu.py index 2b8c088e..837b9911 100644 --- a/miprometheus/grid_workers/grid_trainer_gpu.py +++ b/miprometheus/grid_workers/grid_trainer_gpu.py @@ -18,10 +18,11 @@ """ grid_trainer_gpu.py: - - This file contains the implementation of a worker spanning a grid of training experiments on\ - a collection of GPUs. It works by loading a template yaml file, modifying the resulting dict, and dumping\ - that as yaml into a temporary file. The ``Trainer`` is then executed using the temporary yaml file as the task.\ - It will run as many concurrent jobs as possible. + - This file contains the implementation of a worker spanning a grid of training experiments on \ + a collection of CUDA devices. + - It works by loading a template yaml file, modifying the resulting dict, and dumping \ + that as yaml into a temporary file. The specified :py:class:`miprometheus.workers.Trainer` is then \ + executed using the temporary yaml file as the task. This grid trainer will run as many concurrent jobs as possible. """ @@ -40,16 +41,18 @@ class GridTrainerGPU(GridTrainerCPU): """ Grid Worker managing several training experiments on GPUs. - Reuses a ``Trainer`` (can specify the ``classic`` one or the ``flexible`` one) to start one experiment. + Reuses a :py:class:`miprometheus.workers.Trainer` (can specify :py:class:`miprometheus.workers.OfflineTrainer` \ + or :py:class:`miprometheus.workers.OnlineTrainer`) to start one experiment. - Inherits from ``GridTrainerCPU`` as the constructor & ``setup_grid_experiment`` are identical. + Inherits from :py:class:`miprometheus.grid_workers.GridTrainerCPU` as the constructor & \ + :py:func:`GridTrainerCPU.setup_grid_experiment` are identical. """ def __init__(self, name="GridTrainerGPU", use_gpu=True): """ - Constructor for the ``GridTrainerGPU``: + Constructor for the :py:class:`miprometheus.grid_workers.GridTrainerGPU`: - - Calls the constructor of ``GridTrainerCPU`` as it is identical. + - Calls the constructor of :py:class:`miprometheus.grid_workers.GridTrainerCPU` as it is identical. :param name: Name of the worker (DEFAULT: "GridTrainerGPU"). @@ -62,73 +65,73 @@ def __init__(self, name="GridTrainerGPU", use_gpu=True): # Call the base constructor. super(GridTrainerGPU, self).__init__(name=name,use_gpu=use_gpu) - def setup_grid_experiment(self): """ Setups a specific experiment. - - Calls the ``super(self).setup_experiment()`` to parse arguments, parse config files etc. + - Calls :py:func:`GridTrainerGPU.setup_grid_experiment()` to parse arguments, parse config files etc. - Checks the presence of CUDA-compatible devices. """ super(GridTrainerGPU, self).setup_grid_experiment() + # Check the presence of the CUDA-compatible devices. - if (torch.cuda.device_count() == 0): + if torch.cuda.device_count() == 0: self.logger.error("Cannot use GPU as there are no CUDA-compatible devices present in the system!") exit(-1) - def run_grid_experiment(self): """ - Main function of the ``GridTrainerGPU``. + Main function of the :py:class:`miprometheus.grid_workers.GridTrainerGPU`. Maps the grid experiments to CUDA devices in the limit of the maximum concurrent runs allowed. """ - # Ask for confirmation - optional. - if self.flags.confirm: - input('Press any key to continue') - - # Check the presence of cuda-gpupick - if shutil.which('cuda-gpupick') is not None: - prefix_str = "cuda-gpupick -n1 " - else: - self.logger.warning("Cannot localize the 'cuda-gpupick' script, disabling it") - prefix_str = '' - - # Check max number of child processes. - if self.max_concurrent_runs <= 0: # We need at least one proces! - max_processes = torch.cuda.device_count() - else: - # Take into account the minimum value. - max_processes = min(torch.cuda.device_count(), self.max_concurrent_runs) - self.logger.info('Spanning experiments using {} GPU(s) concurrently.'.format(max_processes)) - - # Run in as many threads as there are GPUs available to the script. - with ThreadPool(processes=max_processes) as pool: - # This contains a list of `AsyncResult` objects. To check if completed and get result. - thread_results = [] - - for task in self.experiments_list: - func = partial(GridTrainerGPU.run_experiment, self, prefix=prefix_str) - thread_results.append(pool.apply_async(func, (task,))) - - # Check every 3 seconds if there is a (supposedly) free GPU to start a task on - sleep(3) - while [r.ready() for r in thread_results].count(False) >= max_processes: + try: + + # Check the presence of cuda-gpupick + if shutil.which('cuda-gpupick') is not None: + prefix_str = "cuda-gpupick -n1 " + else: + self.logger.warning("Cannot localize the 'cuda-gpupick' script, not using it.") + prefix_str = '' + + # Check max number of child processes. + if self.max_concurrent_runs <= 0: # We need at least one process! + max_processes = torch.cuda.device_count() + else: + # Take into account the minimum value. + max_processes = min(torch.cuda.device_count(), self.max_concurrent_runs) + self.logger.info('Spanning experiments using {} GPU(s) concurrently.'.format(max_processes)) + + # Run in as many threads as there are GPUs available to the script. + with ThreadPool(processes=max_processes) as pool: + # This contains a list of `AsyncResult` objects. To check if completed and get result. + thread_results = [] + + for task in self.experiments_list: + func = partial(GridTrainerGPU.run_experiment, self, prefix=prefix_str) + thread_results.append(pool.apply_async(func, (task,))) + + # Check every 3 seconds if there is a (supposedly) free GPU to start a task on sleep(3) + while [r.ready() for r in thread_results].count(False) >= max_processes: + sleep(3) + + # Equivalent of what would usually be called "join" for threads + for r in thread_results: + r.wait() - # Equivalent of what would usually be called "join" for threads - for r in thread_results: - r.wait() + self.logger.info('Grid training finished') - self.logger.info('Grid training experiments finished.') + except KeyboardInterrupt: + self.logger.info('Grid training interrupted!') def main(): """ - Entry point function for the ``GridTrainerGPU``. + Entry point function for the :py:class:`miprometheus.grid_workers.GridTrainerGPU`. """ grid_trainer_gpu = GridTrainerGPU() diff --git a/miprometheus/grid_workers/grid_worker.py b/miprometheus/grid_workers/grid_worker.py index 57310fbf..f3790824 100644 --- a/miprometheus/grid_workers/grid_worker.py +++ b/miprometheus/grid_workers/grid_worker.py @@ -18,15 +18,21 @@ """ grid_worker.py: - - Contains the definition of the ``GridWorker`` class, base for all grid workers, such as ``GridTrainerCPU`` \ - & ``GridAnalyzer``. These grid workers do not inherit from ``Worker``, as they are different in behavior. \ - Rather, they reuse the base workers to manage grid of experiments (by calling them using the command lines). + - Contains the definition of the :py:class:`miprometheus.grid_workers.GridWorker` class, \ + base for all grid workers, such as :py:class:`miprometheus.grid_workers.GridTrainerCPU` \ + & :py:class:`miprometheus.grid_workers.GridAnalyzer`. + + - These grid workers do not inherit from :py:class:`miprometheus.workers.Worker`, as they are different \ + in behavior. Rather, they reuse the base workers to manage grid of experiments (by calling them using the commands). - This class also contains the definition of the default command line arguments of the grid workers. """ __author__ = "Vincent Marois & Tomasz Kornuta" +import os +import psutil + import logging import argparse from abc import abstractmethod @@ -63,7 +69,8 @@ def __init__(self, name="GridWorker", use_gpu=False): :type name: str :param use_gpu: Indicates whether the worker should use GPU or not. Value coming from the subclasses \ - (e.g. ``GridTrainerCPU`` vs ``GridTrainerGPU``) (DEFAULT: False). + (e.g. :py:class:`miprometheus.grid_workers.GridTrainerCPU` vs \ + :py:class:`miprometheus.grid_workers.GridTrainerGPU`) (DEFAULT: False). :type use_gpu: bool """ @@ -106,20 +113,14 @@ def __init__(self, name="GridWorker", use_gpu=False): # Add arguments to the specific parser. # These arguments will be shared by all grid workers. - self.parser.add_argument('--outdir', - dest='outdir', + self.parser.add_argument('--expdir', + dest='expdir', type=str, default="./experiments", - help='Path to the global output directory where the experiments folders ' + help='Path to the directory where the experiments folders ' 'will be / are stored. Affects all grid experiments.' ' (DEFAULT: ./experiments)') - self.parser.add_argument('--savetag', - dest='savetag', - type=str, - default='', - help='Additional tag for the global output directory.') - self.parser.add_argument('--ll', action='store', dest='log_level', @@ -133,15 +134,15 @@ def __init__(self, name="GridWorker", use_gpu=False): default=100, type=int, help='Statistics logging interval. Will impact logging to the logger and exporting to ' - 'TensorBoard for the experiments. Do not affect the grid worker. ' + 'TensorBoard for the experiments. Do not affect the grid worker itself. ' 'Writing to the csv file is not impacted (interval of 1).' ' (Default: 100, i.e. logs every 100 episodes).') self.parser.add_argument('--agree', - dest='confirm', + dest='user_confirm', action='store_true', help='Request user confirmation before starting the grid experiment.' - ' (Default: False)') + ' (Default: False)') def setup_grid_experiment(self): """ @@ -152,7 +153,7 @@ def setup_grid_experiment(self): - Parses command line arguments, - Sets the 3 default sections (training / validation / test) for the param registry, \ - sets seeds to unspecified and disable multiprocessing. Also saves the specified ``cuda`` key. + sets seeds to unspecified and disable multiprocessing. .. note:: @@ -171,7 +172,7 @@ def setup_grid_experiment(self): self.params.add_default_params({"validation": {}}) self.params.add_default_params({"testing": {}}) - # set seeds to undefined (-1), pass CUDA value and deactivate multiprocessing for `DataLoader`. + # set seeds to undefined (-1) and deactivate multiprocessing for `DataLoader`. # It is important not to set the seeds here as they would be identical for all experiments. self.params["training"].add_default_params({"seed_numpy": -1, "seed_torch": -1, @@ -186,7 +187,8 @@ def setup_grid_experiment(self): @abstractmethod def run_grid_experiment(self): """ - Main function of the ``GridWorker``, which essentially maps an experiment to available core or device. + Main function of the :py:class:`miprometheus.grid_workers.GridWorker`, which essentially maps \ + an experiment to available core or device. .. note:: @@ -194,3 +196,20 @@ def run_grid_experiment(self): """ + + def get_available_cpus(self): + """ + Returns the number of available CPUs on the current machine. + """ + + # Check scheduler for number of available cpus - if OS offers that! + if hasattr(os, 'sched_getaffinity'): + return len(os.sched_getaffinity(0)) + + proc = psutil.Process() + # cpu_affinity() is only available on Linux, Windows and FreeBSD + if hasattr(proc, 'cpu_affinity'): + return len(proc.cpu_affinity()) + + # Simply return CPU count + return psutil.cpu_count() diff --git a/miprometheus/models/encoder_solver/mae_cell.py b/miprometheus/models/encoder_solver/mae_cell.py index 76a59f19..4c680bcb 100644 --- a/miprometheus/models/encoder_solver/mae_cell.py +++ b/miprometheus/models/encoder_solver/mae_cell.py @@ -127,7 +127,7 @@ def save(self, model_dir, stat_obj, is_best_model, save_intermediate): 'name': 'MAE controller and interface', 'ctrl_dict': self.controller.state_dict(), 'interface_dict': self.interface.state_dict(), - 'stats': stat_obj.statistics + 'stats': stat_obj.export_to_checkpoint() } # Save the intermediate checkpoint. diff --git a/miprometheus/models/model.py b/miprometheus/models/model.py index 8614bfe5..f77916e5 100644 --- a/miprometheus/models/model.py +++ b/miprometheus/models/model.py @@ -148,6 +148,7 @@ def __init__(self, params, problem_default_values_={}): # Initialization of best loss - as INF. self.best_loss = np.inf + self.best_status = "Unknown" def handshake_definitions(self, problem_data_definitions_): """ @@ -329,46 +330,37 @@ def save(self, model_dir, training_status, training_stats, validation_stats): :type training_status: str :param training_stats: Training statistics that will be saved to checkpoint along with the model. - :type training_stats: :py:class:miprometheus.utils.StatisticsAggregator or :py:class:miprometheus.utils.StatisticsAggregator + :type training_stats: :py:class:`miprometheus.utils.StatisticsCollector` or \ + :py:class:`miprometheus.utils.StatisticsAggregator` :param validation_stats: Validation statistics that will be saved to checkpoint along with the model. - :type validation_stats: :py:class:miprometheus.utils.StatisticsAggregator or :py:class:miprometheus.utils.StatisticsAggregator + :type validation_stats: :py:class:`miprometheus.utils.StatisticsCollector` or \ + :py:class:`miprometheus.utils.StatisticsAggregator` :return: True if this is currently the best model (until the current episode, considering the loss). """ - # Process training statistics. - if training_stats.__class__.__name__ == 'StatisticsCollector': - # "Copy" last values only. - train_stats = {k: v[-1] for k, v in training_stats.items()} - else: - # Simply copy values. - train_stats = {k: v for k, v in training_stats.items()} - - # Proces validation statistics, get the episode and loss. + # Process validation statistics, get the episode and loss. if validation_stats.__class__.__name__ == 'StatisticsCollector': # Get data from collector. episode = validation_stats['episode'][-1] loss = validation_stats['loss'][-1] - # "Copy" last values only. - valid_stats = {k: v[-1] for k, v in validation_stats.items()} else: - # Get data from aggregator. + # Get data from StatisticsAggregator. episode = validation_stats['episode'] loss = validation_stats['loss'] - # Simply copy values. - valid_stats = {k: v for k, v in validation_stats.items()} # Checkpoint to be saved. chkpt = {'name': self.name, 'state_dict': self.state_dict(), - 'timestamp': datetime.now(), + 'model_timestamp': datetime.now(), 'episode': episode, 'loss': loss, 'status': training_status, - 'training_stats': train_stats, - 'validation_stats': valid_stats + 'status_timestamp': datetime.now(), + 'training_stats': training_stats.export_to_checkpoint(), + 'validation_stats': validation_stats.export_to_checkpoint() } # Save the intermediate checkpoint. @@ -381,12 +373,24 @@ def save(self, model_dir, training_status, training_stats, validation_stats): # Save the best model. loss = loss.cpu() # moving loss value to cpu type to allow (initial) comparison with numpy type if loss < self.best_loss: + # Save best loss and status. self.best_loss = loss + self.best_status = training_status + # Save checkpoint. filename = model_dir + 'model_best.pt' torch.save(chkpt, filename) self.logger.info("Model and statistics exported to checkpoint {}".format(filename)) return True - + elif self.best_status != training_status: + filename = model_dir + 'model_best.pt' + # Load checkpoint. + chkpt_loaded = torch.load(filename, map_location=lambda storage, loc: storage) + # Update status and status time. + chkpt_loaded['status'] = training_status + chkpt_loaded['status_timestamp'] = datetime.now() + # Save updated checkpoint. + torch.save(chkpt_loaded, filename) + self.logger.info("Updated training status in checkpoint {}".format(filename)) # Else: that was not the best model. return False @@ -409,7 +413,7 @@ def load(self, checkpoint_file): self.logger.info( "Imported {} parameters from checkpoint from {} (episode: {}, loss: {}, status: {})".format( chkpt['name'], - chkpt['timestamp'], + chkpt['model_timestamp'], chkpt['episode'], chkpt['loss'], chkpt['status'] diff --git a/miprometheus/problems/image_to_class/mnist.py b/miprometheus/problems/image_to_class/mnist.py index 255487a2..3c8d5a09 100644 --- a/miprometheus/problems/image_to_class/mnist.py +++ b/miprometheus/problems/image_to_class/mnist.py @@ -80,7 +80,7 @@ def __init__(self, params_): >>> self.params.add_default_params({'data_folder': '~/data/mnist', >>> 'use_train_data': True}) - :param params: Dictionary of parameters (read from configuration ``.yaml`` file). + :param params_: Dictionary of parameters (read from configuration ``.yaml`` file). """ @@ -99,7 +99,7 @@ def __init__(self, params_): self.use_train_data = self.params['use_train_data'] # Add transformations depending on the resizing option. - if ('resize' in self.params): + if 'resize' in self.params: # Check the desired size. if len(self.params['resize']) != 2: self.logger.error("'resize' field must contain 2 values: the desired height and width") @@ -130,7 +130,6 @@ def __init__(self, params_): 'width': self.width, 'height': self.height} - self.data_definitions = {'images': {'size': [-1, self.num_channels, self.height, self.width], 'type': [torch.Tensor]}, 'targets': {'size': [-1], 'type': [torch.Tensor]}, 'targets_label': {'size': [-1, 1], 'type': [list, str]} @@ -138,7 +137,7 @@ def __init__(self, params_): # load the dataset self.dataset = datasets.MNIST(root=data_folder, train=self.use_train_data, download=True, - transform=transform) + transform=transform) # Set length. self.length = len(self.dataset) diff --git a/miprometheus/utils/statistics_aggregator.py b/miprometheus/utils/statistics_aggregator.py index ce3749e6..c4ebc5f9 100644 --- a/miprometheus/utils/statistics_aggregator.py +++ b/miprometheus/utils/statistics_aggregator.py @@ -16,10 +16,12 @@ # limitations under the License. """ -statistics_aggregator.py: Allows to compute several statistical aggregators (e.g. average, standard deviation...)\ - using the statistics collected over an epoch or a validation phase by the ``StatisticsCollector``. +statistics_aggregator.py: + + - Allows to compute several "statistical aggregators" (e.g. average, standard deviation...) using the \ + statistics collected over an epoch or a validation phase by the :py:class:`miprometheus.utils.StatisticsCollector`. + - Allows to summarize the current epoch or validation phase using statistical aggregators. - Allows to summarize the current epoch or validation phase using statistical aggregators. """ __author__ = "Vincent Marois, Tomasz Kornuta" @@ -32,12 +34,12 @@ class StatisticsAggregator(StatisticsCollector): """ Specialized class used for the computation of several statistical aggregators. - Inherits from ``StatisticsCollector`` as it extends its capabilities: it relies \ - on ``StatisticsCollector`` to collect the statistics over an epoch (training set) \ + Inherits from :py:class:`miprometheus.utils.StatisticsCollector` as it extends its capabilities: it relies \ + on :py:class:`miprometheus.utils.StatisticsCollector` to collect the statistics over an epoch (training set) \ or a validation (over the validation set). - Once the statistics have been collected, the ``StatisticsAggregator`` allows to compute several \ - statistical aggregators to summarize the last epoch or validation phase. + Once the statistics have been collected, the :py:class:`miprometheus.utils.StatisticsAggregator` allows \ + to compute several statistical aggregators to summarize the last epoch or validation phase. E.g. With the list of loss values from the last epoch, we can compute the average loss, the min & max, \ and the standard deviation. @@ -47,9 +49,9 @@ class StatisticsAggregator(StatisticsCollector): def __init__(self): """ - Constructor for the ``StatisticsAggregator``. Defines empty aggregators dict. + Constructor for the :py:class:`miprometheus.utils.StatisticsAggregator`. Defines empty aggregators dict. - Other statistical aggregators can be added via ``self.add_aggregator()``. + Other statistical aggregators can be added via :py:func:`StatisticsAggregator.add_aggregator()`. """ # call base constructor @@ -63,9 +65,10 @@ def add_aggregator(self, key, formatting): The value associated to the specified key is initiated as -1. :param key: Statistical aggregator to add. Such aggregator (e.g. min, max, mean, std...)\ - should be based on an existing statistics collected by the ``StatisticsCollector`` \ - (e.g. added by ``add_statistic`` and collected by ``model.collect_statistics`` or \ - ``problem.collect_statistics``. + should be based on an existing statistics collected by the :py:class:`miprometheus.utils.StatisticsCollector` \ + (e.g. added by :py:func:`StatisticsCollector.add_statistic()` and collected by \ + :py:func:`miprometheus.models.Model.collect_statistics()` or \ + :py:func:`miprometheus.models.Problem.collect_statistics()`. :type key: str :param formatting: Formatting that will be used when logging and exporting to CSV. @@ -183,6 +186,24 @@ def export_to_csv(self, csv_file=None): csv_file.write(values_str) + def export_to_checkpoint(self): + """ + This method exports the aggregated data into a dictionary using the associated formatting. + + """ + chkpt = {} + + # Iterate through key, values and format them. + for key, value in self.aggregators.items(): + + # Get formatting - using '{}' as default. + format_str = self.formatting.get(key, '{}') + + # Add to dict. + chkpt[key] = format_str.format(value) + + return chkpt + def export_to_string(self, additional_tag=''): """ This method returns the current statistical aggregators values in the form of a string using the \ @@ -216,13 +237,13 @@ def export_to_tensorboard(self, tb_writer = None): Method exports current statistical aggregators values to TensorBoard. :param tb_writer: TensorBoard writer, optional - :type tb_writer: ``tensorboardX.SummaryWriter`` + :type tb_writer: :py:class:`tensorboardX.SummaryWriter` """ # Get episode number. episode = self.aggregators['episode'] - if (tb_writer is None): + if tb_writer is None: tb_writer = self.tb_writer # If it is still None - well, we cannot do anything more. if tb_writer is None: @@ -249,10 +270,8 @@ def export_to_tensorboard(self, tb_writer = None): for episode, loss in enumerate(loss_values): stat_col['episode'] = episode stat_col['loss'] = loss - #print(stat_col.export_statistics_to_string()) - - # Aggregate. - stat_agg.aggregate_statistics(stat_col) + # print(stat_col.export_statistics_to_string()) + print(stat_agg.export_to_string()) # Add new aggregator (a simulation of "additional statistics collected by model") diff --git a/miprometheus/utils/statistics_collector.py b/miprometheus/utils/statistics_collector.py index afe162ce..f6ac958f 100644 --- a/miprometheus/utils/statistics_collector.py +++ b/miprometheus/utils/statistics_collector.py @@ -30,8 +30,7 @@ class StatisticsCollector(Mapping): Specialized class used for the collection and export of statistics during\ training, validation and testing. - Inherits ``collections.Mapping``, therefore it offers functionality\ - close to a ``dict``. + Inherits :py:class:`collections.Mapping`, therefore it offers functionality close to a ``dict``. """ @@ -172,6 +171,24 @@ def export_to_csv(self, csv_file=None): csv_file.write(values_str) + def export_to_checkpoint(self): + """ + This method exports the collected data into a dictionary using the associated formatting. + + """ + chkpt = {} + + # Iterate through key, values and format them. + for key, value in self.statistics.items(): + + # Get formatting - using '{}' as default. + format_str = self.formatting.get(key, '{}') + + # Add to dict. + chkpt[key] = format_str.format(value[-1]) + + return chkpt + def export_to_string(self, additional_tag=''): """ Method returns current statistics in the form of string using the @@ -181,7 +198,7 @@ def export_to_string(self, additional_tag=''): :type additional_tag: str - :return: String being the concatenation of the statistical aggregators names & values. + :return: String being the concatenation of the statistics names & values. """ # Iterate through keys and values and concatenate them. @@ -209,6 +226,7 @@ def export_to_tensorboard(self, tb_writer=None): Method exports current statistics to tensorboard. :param tb_writer: TensorBoard writer, optional. + :type tb_writer: :py:class:`tensorboardX.SummaryWriter` """ # Get episode number. diff --git a/miprometheus/workers/offline_trainer.py b/miprometheus/workers/offline_trainer.py index ef274a25..da4a1325 100644 --- a/miprometheus/workers/offline_trainer.py +++ b/miprometheus/workers/offline_trainer.py @@ -169,8 +169,8 @@ def run_experiment(self): # Reset the counter. episode = -1 - # Set default termination cause. - training_status = "Epoch limit reached" + # Set initial status. + training_status = "Not Converged" # Iterate over epochs. for epoch in range(self.epoch_limit): self.logger.info('Starting next epoch: {}'.format(epoch)) @@ -270,15 +270,16 @@ def run_experiment(self): self.validate_on_batch(self.validation_batch, episode, epoch) # Aggregate statistics, but do not display them in log. - self.aggregate_and_export_statistics(self.model, self.validation_problem, - self.validation_stat_col, self.validation_stat_agg, episode, '[Partial Validation]', False) + # self.aggregate_and_export_statistics(self.model, self.validation_problem, + # self.validation_stat_col, self.validation_stat_agg, + # episode, '[Partial Validation]', False) - # Do not save the model! Offline trainer uses full set fot determining whether to save or not. + # Do not save the model: OfflineTrainer uses the full set to determine whether to save or not. # III. The episodes number limit has been reached. if episode+1 >= self.episode_limit: - training_status = "Episode Limit reached" - break # the inner loop. + training_status = "Not converged: Episode Limit reached" + break # the inner loop. # Epoch just ended! (or episode limit). # Inform the problem class that the epoch has ended. @@ -313,7 +314,12 @@ def run_experiment(self): # Check the Full Validation loss. if self.validation_stat_agg["loss"] < self.loss_stop: - training_status = "Model converged (Full Validation Loss went below Loss Stop threshold)" + # Change the status... + training_status = "Converged (Full Validation Loss went below Loss Stop threshold)" + + # ... and THEN try to save the model using the average validation loss. + self.model.save(self.model_dir, training_status, self.training_stat_agg, self.validation_stat_agg) + break # II. Early stopping is set and loss hasn't improved by delta in n epochs. @@ -329,6 +335,12 @@ def run_experiment(self): ''' End of main training and validation loop. Perform final full validation. ''' + # Try to save the model only if we hit the epoch limit. + if epoch+1 >= self.epoch_limit: + # Change the status. + training_status = "Not converged: Epoch Limit reached" + + # Display status. self.logger.info('\n' + '='*80) self.logger.info('Training finished because {}'.format(training_status)) # Check visualization flag - turn on visualization for last validation if needed. @@ -340,7 +352,10 @@ def run_experiment(self): # Validate over the entire validation set. self.validate_on_set(episode, epoch) - # Do not save the model, as we tried it already on "last" validation. + # Try to save the model only if we hit the epoch limit. + if epoch+1 >= self.epoch_limit: + # Try to save the model using the average validation loss. + self.model.save(self.model_dir, training_status, self.training_stat_agg, self.validation_stat_agg) self.logger.info('Experiment finished!') diff --git a/miprometheus/workers/online_trainer.py b/miprometheus/workers/online_trainer.py index a39ce28b..a9a7e4ae 100644 --- a/miprometheus/workers/online_trainer.py +++ b/miprometheus/workers/online_trainer.py @@ -165,7 +165,7 @@ def run_experiment(self): # Inform the training problem class that epoch has started. self.training_problem.initialize_epoch(epoch) - # Set default termination cause. + # Set initial status. training_status = "Not Converged" for training_dict in self.training_dataloader: @@ -265,8 +265,12 @@ def run_experiment(self): # Check the Partial Validation loss. if (validation_loss < self.loss_stop): - training_status = "Model converged (Partial Validation Loss went below " \ + # Change the status... + training_status = "Converged (Partial Validation Loss went below " \ "Loss Stop threshold)" + + # ... and THEN save the model using the latest validation statistics. + self.model.save(self.model_dir, training_status, self.training_stat_col, self.validation_stat_col) break # II. Early stopping is set and loss hasn't improved by delta in n epochs. @@ -277,7 +281,7 @@ def run_experiment(self): if episode+1 >= self.episode_limit: # If we reach this condition, then it is possible that the model didn't converge correctly # but it currently might get better since last validation. - training_status = "Episode Limit reached" + training_status = "Not converged: Episode Limit reached" break # Check if we are at the end of the 'epoch': indicate that the DataLoader is now cycling. @@ -296,7 +300,7 @@ def run_experiment(self): # IV. Epoch limit has been reached. if epoch+1 >= self.epoch_limit: - training_status = "Epoch Limit reached" + training_status = "Not converged: Epoch Limit reached" # "Finish" the training. break @@ -324,7 +328,7 @@ def run_experiment(self): # Perform validation. self.validate_on_batch(self.validation_batch, episode, epoch) - # Save the model using the latest validation statistics. + # Try to save the model using the latest validation statistics. self.model.save(self.model_dir, training_status, self.training_stat_col, self.validation_stat_col) self.logger.info('\n' + '='*80) diff --git a/miprometheus/workers/trainer.py b/miprometheus/workers/trainer.py index c19f494b..7799a4cb 100644 --- a/miprometheus/workers/trainer.py +++ b/miprometheus/workers/trainer.py @@ -191,7 +191,7 @@ def setup_experiment(self): time_str = '{0:%Y%m%d_%H%M%S}'.format(datetime.now()) if self.flags.savetag != '': time_str = time_str + "_" + self.flags.savetag - self.log_dir = self.flags.outdir + '/' + training_problem_name + '/' + model_name + '/' + time_str + '/' + self.log_dir = self.flags.expdir + '/' + training_problem_name + '/' + model_name + '/' + time_str + '/' os.makedirs(self.log_dir, exist_ok=False) except FileExistsError: sleep(1) diff --git a/miprometheus/workers/worker.py b/miprometheus/workers/worker.py index f57eba49..56af00a4 100644 --- a/miprometheus/workers/worker.py +++ b/miprometheus/workers/worker.py @@ -117,11 +117,11 @@ def __init__(self, name, add_default_parser_args = True): help='The current worker will move the computations on GPU devices, if available ' 'in the system. (Default: False)') - self.parser.add_argument('--outdir', - dest='outdir', + self.parser.add_argument('--expdir', + dest='expdir', type=str, default="./experiments", - help='Path to the output directory where the experiment(s) folders will be stored.' + help='Path to the directory where the experiment(s) folders are/will be stored.' ' (DEFAULT: ./experiments)') self.parser.add_argument('--savetag', @@ -380,8 +380,6 @@ def export_experiment_configuration(self, log_dir, filename, user_confirm): if user_confirm: try: input('Press to confirm and start the experiment\n') - except Exception: - pass except KeyboardInterrupt: exit(0) diff --git a/setup.py b/setup.py index 3e334dd5..c272379e 100644 --- a/setup.py +++ b/setup.py @@ -159,6 +159,7 @@ 'tensorboardX', 'matplotlib', 'numpy', + 'psutil', 'PyYAML', 'tqdm', 'nltk',