diff --git a/apps/backend/app.py b/apps/backend/app.py index 085b8a9e..996e9b1f 100644 --- a/apps/backend/app.py +++ b/apps/backend/app.py @@ -7,7 +7,7 @@ from flask import Flask, Response, request, jsonify from kubernetes import client, config import pymongo -from modules.mongo import upload_experiment_aggregated_results, upload_experiment_zip, upload_log_file, verify_mongo_connection, check_insert_default_experiments, download_experiment_file, get_experiment +from modules.mongo import upload_experiment_aggregated_results, upload_experiment_zip, upload_log_file, verify_mongo_connection, check_insert_default_experiments, download_experiment_file, get_experiment, update_exp_value from spawn_runner import create_job, create_job_object flaskApp = Flask(__name__) @@ -112,6 +112,18 @@ def get_experiment_post(): return {'contents': get_experiment(experiment_id, mongoClient)} except Exception: return Response(status=500) + +@flaskApp.post("/updateExperiment") +def update_experiment(): + try: + json = request.get_json() + experiment_id = json['experimentId'] + field = json['field'] + newVal = json['newValue'] + update_exp_value(experiment_id, field, newVal, mongoClient) + return Response(status=200) + except Exception: + return Response(status=500) if __name__ == '__main__': flaskApp.run() diff --git a/apps/backend/modules/mongo.py b/apps/backend/modules/mongo.py index 4707b0a8..ada93b56 100644 --- a/apps/backend/modules/mongo.py +++ b/apps/backend/modules/mongo.py @@ -97,3 +97,8 @@ def get_experiment(expId: str, mongoClient: pymongo.MongoClient): experiment["id"] = expId experiment["expId"] = expId return experiment + +def update_exp_value(expId: str, field: str, newValue: str, mongoClient: pymongo.MongoClient): + experimentsCollection = mongoClient["gladosdb"].experiments + experimentsCollection.update_one({"_id": ObjectId(expId)}, {"$set": {field: newValue}}) + return \ No newline at end of file diff --git a/apps/runner/modules/runner.py b/apps/runner/modules/runner.py index 86ff44fe..c7e28c37 100644 --- a/apps/runner/modules/runner.py +++ b/apps/runner/modules/runner.py @@ -11,6 +11,7 @@ from modules.exceptions import InternalTrialFailedError from modules.configs import get_config_paramNames from modules.logging.gladosLogging import get_experiment_logger +from modules.utils import update_exp_value PROCESS_OUT_STREAM = 0 PROCESS_ERROR_STREAM = 1 @@ -85,7 +86,7 @@ def _add_to_output_batch(trialExtraFile, ExpRun: int): raise FileHandlingError("Failed to copy results csv. Maybe there was a typo in the filepath?") from err -def conduct_experiment(experiment: ExperimentData, expRef): +def conduct_experiment(experiment: ExperimentData): """ Call this function when inside the experiment folder! """ @@ -100,7 +101,8 @@ def conduct_experiment(experiment: ExperimentData, expRef): for trialNum in range(0, experiment.totalExperimentRuns): startSeconds = time.time() if trialNum == 0: - expRef.update({"startedAtEpochMillis": int(startSeconds * 1000)}) + # expRef.update({"startedAtEpochMillis": int(startSeconds * 1000)}) + update_exp_value(experiment.expId, "startedAtEpochMillis", int(startSeconds * 1000)) try: configFileName = create_config_from_data(experiment, trialNum) @@ -111,7 +113,7 @@ def conduct_experiment(experiment: ExperimentData, expRef): try: _run_trial(experiment, f'configFiles/{configFileName}', trialNum) except (TrialTimeoutError, InternalTrialFailedError) as err: - _handle_trial_error(experiment, expRef, numOutputs, paramNames, writer, trialNum, err) + _handle_trial_error(experiment, numOutputs, paramNames, writer, trialNum, err) continue endSeconds = time.time() @@ -120,12 +122,13 @@ def conduct_experiment(experiment: ExperimentData, expRef): if trialNum == 0: estimatedTotalTimeMinutes = timeTakenMinutes * experiment.totalExperimentRuns explogger.info(f"Estimated minutes to run: {estimatedTotalTimeMinutes}") - expRef.update({'estimatedTotalTimeMinutes': estimatedTotalTimeMinutes}) + # expRef.update({'estimatedTotalTimeMinutes': estimatedTotalTimeMinutes}) + update_exp_value(experiment.expId, 'estimatedTotalTimeMinutes', estimatedTotalTimeMinutes) try: csvHeader = _get_line_n_of_trial_results_csv(0, experiment.trialResult) except GladosUserError as err: - _handle_trial_error(experiment, expRef, numOutputs, paramNames, writer, trialNum, err) + _handle_trial_error(experiment, numOutputs, paramNames, writer, trialNum, err) return numOutputs = len(csvHeader) writer.writerow(["Experiment Run"] + csvHeader + paramNames) @@ -134,23 +137,24 @@ def conduct_experiment(experiment: ExperimentData, expRef): try: _add_to_output_batch(experiment.trialExtraFile, trialNum) except FileHandlingError as err: - _handle_trial_error(experiment, expRef, numOutputs, paramNames, writer, trialNum, err) + _handle_trial_error(experiment, numOutputs, paramNames, writer, trialNum, err) continue try: output = _get_line_n_of_trial_results_csv(1, experiment.trialResult) except GladosUserError as err: - _handle_trial_error(experiment, expRef, numOutputs, paramNames, writer, trialNum, err) + _handle_trial_error(experiment, numOutputs, paramNames, writer, trialNum, err) continue writer.writerow([trialNum] + output + get_configs_ordered(f'configFiles/{trialNum}.ini', paramNames)) explogger.info(f'Trial#{trialNum} completed') experiment.passes += 1 - expRef.update({'passes': experiment.passes}) + # expRef.update({'passes': experiment.passes}) + update_exp_value(experiment.expId, 'passes', experiment.passes) explogger.info("Finished running Trials") -def _handle_trial_error(experiment: ExperimentData, expRef, numOutputs: int, paramNames: "list", writer, trialNum: int, err: BaseException): +def _handle_trial_error(experiment: ExperimentData, numOutputs: int, paramNames: "list", writer, trialNum: int, err: BaseException): csvErrorValue = None if isinstance(err, TrialTimeoutError): csvErrorValue = "TIMEOUT" @@ -160,7 +164,8 @@ def _handle_trial_error(experiment: ExperimentData, expRef, numOutputs: int, par explogger.error(f'Trial#{trialNum} Encountered an Error') explogger.exception(err) experiment.fails += 1 - expRef.update({'fails': experiment.fails}) + # expRef.update({'fails': experiment.fails}) + update_exp_value(experiment.expId, 'fails', experiment.fails) if trialNum == 0: message = f"First trial of {experiment.expId} ran into an error while running, aborting the whole experiment. Read the traceback to find out what the actual cause of this problem is (it will not necessarily be at the top of the stack trace)." raise ExperimentAbort(message) from err diff --git a/apps/runner/modules/utils.py b/apps/runner/modules/utils.py index 18ac4330..f58823d1 100644 --- a/apps/runner/modules/utils.py +++ b/apps/runner/modules/utils.py @@ -86,6 +86,19 @@ def get_experiment_with_id(experimentId: str): return response.json()['contents'] else: raise DatabaseConnectionError("Error while getting experiment from backend!") + +def update_exp_value(experimentId: str, field: str, newValue): + url = f'http://glados-service-backend:{os.getenv("BACKEND_PORT")}/updateExperiment' + payload = { + "experimentId": experimentId, + "field": field, + "newValue": newValue + } + response = requests.post(url, json=payload) + if response.status_code == 200: + return + else: + raise DatabaseConnectionError("Error updating experiment document!") def _callBackend(url, payload, logMsg): try: diff --git a/apps/runner/runner.py b/apps/runner/runner.py index 9a1dec4a..4a326d9b 100644 --- a/apps/runner/runner.py +++ b/apps/runner/runner.py @@ -23,7 +23,7 @@ from modules.exceptions import CustomFlaskError, DatabaseConnectionError, GladosInternalError, ExperimentAbort, GladosUserError from modules.output.plots import generateScatterPlot from modules.configs import generate_config_files -from modules.utils import _get_env, upload_experiment_aggregated_results, upload_experiment_log, upload_experiment_zip, verify_mongo_connection, get_experiment_with_id +from modules.utils import _get_env, upload_experiment_aggregated_results, upload_experiment_log, upload_experiment_zip, verify_mongo_connection, get_experiment_with_id, update_exp_value try: import magic # Crashes on windows if you're missing the 'python-magic-bin' python package @@ -104,7 +104,7 @@ def run_batch(data: IncomingStartRequest): else: explogger.error("Error generating hyperparameters - Validation error") explogger.exception(err) - # close_experiment_run(expId, expRef) + close_experiment_run(expId) return experimentData['hyperparameters'] = hyperparameters @@ -115,7 +115,7 @@ def run_batch(data: IncomingStartRequest): except ValueError as err: explogger.error("Experiment data was not formatted correctly, aborting") explogger.exception(err) - # close_experiment_run(expId, expRef) + close_experiment_run(expId) return #Downloading Experiment File @@ -130,7 +130,7 @@ def run_batch(data: IncomingStartRequest): explogger.error("This is not a supported experiment file type, aborting") explogger.exception(err) os.chdir('../..') - # close_experiment_run(expId, expRef) + close_experiment_run(expId) return explogger.info(f"Generating configs and downloading to ExperimentFiles/{expId}/configFiles") @@ -139,15 +139,16 @@ def run_batch(data: IncomingStartRequest): if totalExperimentRuns == 0: os.chdir('../..') explogger.exception(GladosInternalError("Error generating configs - somehow no config files were produced?")) - # close_experiment_run(expId, expRef) + close_experiment_run(expId) return experiment.totalExperimentRuns = totalExperimentRuns - expRef.update({"totalExperimentRuns": experiment.totalExperimentRuns}) + # expRef.update({"totalExperimentRuns": experiment.totalExperimentRuns}) + update_exp_value(expId, "totalExperimentRuns", experiment.totalExperimentRuns) try: - conduct_experiment(experiment, expRef) + conduct_experiment(experiment) post_process_experiment(experiment) upload_experiment_results(experiment) except ExperimentAbort as err: @@ -159,14 +160,15 @@ def run_batch(data: IncomingStartRequest): finally: # We need to be out of the dir for the log file upload to work os.chdir('../..') - # close_experiment_run(expId, expRef) + # close_experiment_run(expId) -def close_experiment_run(expId: DocumentId, expRef: "typing.Any | None"): +def close_experiment_run(expId: DocumentId): explogger.info(f'Exiting experiment {expId}') - if expRef: - expRef.update({'finished': True, 'finishedAtEpochMillis': int(time.time() * 1000)}) - else: - syslogger.warning(f'No experiment ref supplied when closing {expId} , could not update it to finished') + # if expRef: + # expRef.update({'finished': True, 'finishedAtEpochMillis': int(time.time() * 1000)}) + # else: + # syslogger.warning(f'No experiment ref supplied when closing {expId} , could not update it to finished') + update_exp_value(expId, 'finished', True) close_experiment_logger() upload_experiment_log(expId) remove_downloaded_directory(expId)