Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/mongo-dev' into development
Browse files Browse the repository at this point in the history
  • Loading branch information
rhit-windsors committed Nov 10, 2024
2 parents d4abbf7 + c6872ef commit c32af63
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 69 deletions.
22 changes: 21 additions & 1 deletion apps/backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from flask import Flask, Response, request, jsonify, send_file
from kubernetes import client, config
import pymongo
from modules.mongo import upload_experiment_aggregated_results, upload_experiment_zip, upload_log_file, verify_mongo_connection, check_insert_default_experiments, download_experiment_file
from modules.mongo import upload_experiment_aggregated_results, upload_experiment_zip, upload_log_file, verify_mongo_connection, check_insert_default_experiments, download_experiment_file, get_experiment, update_exp_value

from spawn_runner import create_job, create_job_object
flaskApp = Flask(__name__)
Expand Down Expand Up @@ -108,6 +108,26 @@ def download_exp_file():
return send_file(file_stream, as_attachment=True, download_name="experiment_file", mimetype="application/octet-stream")
except Exception:
return Response(status=500)

@flaskApp.post("/getExperiment")
def get_experiment_post():
try:
experiment_id = request.get_json()['experimentId']
return {'contents': get_experiment(experiment_id, mongoClient)}
except Exception:
return Response(status=500)

@flaskApp.post("/updateExperiment")
def update_experiment():
try:
json = request.get_json()
experiment_id = json['experimentId']
field = json['field']
newVal = json['newValue']
update_exp_value(experiment_id, field, newVal, mongoClient)
return Response(status=200)
except Exception:
return Response(status=500)

if __name__ == '__main__':
flaskApp.run()
24 changes: 18 additions & 6 deletions apps/backend/modules/mongo.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import pymongo
from pymongo.errors import ConnectionFailure
from bson import Binary
from bson import Binary, ObjectId
from gridfs import GridFSBucket

def verify_mongo_connection(mongoClient: pymongo.MongoClient):
Expand All @@ -12,7 +12,7 @@ def verify_mongo_connection(mongoClient: pymongo.MongoClient):
raise Exception("MongoDB server not available/unreachable") from err

def upload_experiment_aggregated_results(experimentId: str, results: str, mongoClient: pymongo.MongoClient):
experimentResultEntry = {"_id": experimentId, "resultContent": results}
experimentResultEntry = {"experimentId": experimentId, "resultContent": results}
# Get the results connection
resultsCollection = mongoClient["gladosdb"].results
try:
Expand All @@ -25,17 +25,16 @@ def upload_experiment_aggregated_results(experimentId: str, results: str, mongoC
raise Exception("Encountered error while storing aggregated results in MongoDB") from err

def upload_experiment_zip(experimentId: str, encoded: Binary, mongoClient: pymongo.MongoClient):
experimentZipEntry = {"_id": experimentId, "fileContent": encoded}
experimentZipEntry = {"experimentId": experimentId, "fileContent": encoded}
zipCollection = mongoClient["gladosdb"].zips
try:
# TODO: Refactor to call the backend
resultZipId = zipCollection.insert_one(experimentZipEntry).inserted_id
return resultZipId
except Exception as err:
raise Exception("Encountered error while storing results zip in MongoDB") from err

def upload_log_file(experimentId: str, contents: str, mongoClient: pymongo.MongoClient):
logFileEntry = {"_id": experimentId, "fileContent": contents}
logFileEntry = {"experimentId": experimentId, "fileContent": contents}
logCollection = mongoClient["gladosdb"].logs
try:
resultId = logCollection.insert_one(logFileEntry).inserted_id
Expand Down Expand Up @@ -89,4 +88,17 @@ def download_experiment_file(expId: str, mongoClient: pymongo.MongoClient):
file = bucket.open_download_stream_by_name(file_name)
contents = file.read()
return contents


def get_experiment(expId: str, mongoClient: pymongo.MongoClient):
experimentsCollection = mongoClient["gladosdb"].experiments
experiment = experimentsCollection.find_one({"_id": ObjectId(expId)}, {"_id": 0})
if experiment is None:
raise Exception("Could not find experiment!")
experiment["id"] = expId
experiment["expId"] = expId
return experiment

def update_exp_value(expId: str, field: str, newValue: str, mongoClient: pymongo.MongoClient):
experimentsCollection = mongoClient["gladosdb"].experiments
experimentsCollection.update_one({"_id": ObjectId(expId)}, {"$set": {field: newValue}})
return
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const mongoCSVHandler: NextApiHandler<ResultsCsv> = async (req, res) => {
results = await db
.collection(COLLECTION_RESULTS_CSVS)
// TODO correct mongodb typescript type for id
.find({ '_id': expIdToCsvDownload as any }).toArray();
.find({ 'experimentId': expIdToCsvDownload as any }).toArray();
} catch (error) {
const message = 'Failed to download the csv';
console.error('Error contacting server: ', error);
Expand Down
2 changes: 1 addition & 1 deletion apps/frontend/pages/api/download/logs/[idOfLogFile].tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const mongoLogHandler: NextApiHandler<String> = async (req, res) => {
results = await db
.collection(COLLECTION_LOGS)
// TODO correct mongodb typescript type for id
.find({ '_id': idOfLogFile as any }).toArray();
.find({ 'experimentId': idOfLogFile as any }).toArray();
} catch (error) {
const message = 'Failed to download the log file';
console.error('Error contacting server: ', error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const mongoZipHandler: NextApiHandler<ProjectZip> = async (req, res) => {
results = await db
.collection(COLLECTION_ZIPS)
// TODO correct mongodb typescript type for id
.find({ '_id': expIdToZipDownload as any }).toArray();
.find({ 'experimentId': expIdToZipDownload as any }).toArray();
} catch (error) {
const message = 'Failed to download the zip';
console.error('Error contacting server: ', error);
Expand Down
25 changes: 15 additions & 10 deletions apps/runner/modules/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from modules.exceptions import InternalTrialFailedError
from modules.configs import get_config_paramNames
from modules.logging.gladosLogging import get_experiment_logger
from modules.utils import update_exp_value

PROCESS_OUT_STREAM = 0
PROCESS_ERROR_STREAM = 1
Expand Down Expand Up @@ -85,7 +86,7 @@ def _add_to_output_batch(trialExtraFile, ExpRun: int):
raise FileHandlingError("Failed to copy results csv. Maybe there was a typo in the filepath?") from err


def conduct_experiment(experiment: ExperimentData, expRef):
def conduct_experiment(experiment: ExperimentData):
"""
Call this function when inside the experiment folder!
"""
Expand All @@ -100,7 +101,8 @@ def conduct_experiment(experiment: ExperimentData, expRef):
for trialNum in range(0, experiment.totalExperimentRuns):
startSeconds = time.time()
if trialNum == 0:
expRef.update({"startedAtEpochMillis": int(startSeconds * 1000)})
# expRef.update({"startedAtEpochMillis": int(startSeconds * 1000)})
update_exp_value(experiment.expId, "startedAtEpochMillis", int(startSeconds * 1000))

try:
configFileName = create_config_from_data(experiment, trialNum)
Expand All @@ -111,7 +113,7 @@ def conduct_experiment(experiment: ExperimentData, expRef):
try:
_run_trial(experiment, f'configFiles/{configFileName}', trialNum)
except (TrialTimeoutError, InternalTrialFailedError) as err:
_handle_trial_error(experiment, expRef, numOutputs, paramNames, writer, trialNum, err)
_handle_trial_error(experiment, numOutputs, paramNames, writer, trialNum, err)
continue

endSeconds = time.time()
Expand All @@ -120,12 +122,13 @@ def conduct_experiment(experiment: ExperimentData, expRef):
if trialNum == 0:
estimatedTotalTimeMinutes = timeTakenMinutes * experiment.totalExperimentRuns
explogger.info(f"Estimated minutes to run: {estimatedTotalTimeMinutes}")
expRef.update({'estimatedTotalTimeMinutes': estimatedTotalTimeMinutes})
# expRef.update({'estimatedTotalTimeMinutes': estimatedTotalTimeMinutes})
update_exp_value(experiment.expId, 'estimatedTotalTimeMinutes', estimatedTotalTimeMinutes)

try:
csvHeader = _get_line_n_of_trial_results_csv(0, experiment.trialResult)
except GladosUserError as err:
_handle_trial_error(experiment, expRef, numOutputs, paramNames, writer, trialNum, err)
_handle_trial_error(experiment, numOutputs, paramNames, writer, trialNum, err)
return
numOutputs = len(csvHeader)
writer.writerow(["Experiment Run"] + csvHeader + paramNames)
Expand All @@ -134,23 +137,24 @@ def conduct_experiment(experiment: ExperimentData, expRef):
try:
_add_to_output_batch(experiment.trialExtraFile, trialNum)
except FileHandlingError as err:
_handle_trial_error(experiment, expRef, numOutputs, paramNames, writer, trialNum, err)
_handle_trial_error(experiment, numOutputs, paramNames, writer, trialNum, err)
continue

try:
output = _get_line_n_of_trial_results_csv(1, experiment.trialResult)
except GladosUserError as err:
_handle_trial_error(experiment, expRef, numOutputs, paramNames, writer, trialNum, err)
_handle_trial_error(experiment, numOutputs, paramNames, writer, trialNum, err)
continue
writer.writerow([trialNum] + output + get_configs_ordered(f'configFiles/{trialNum}.ini', paramNames))

explogger.info(f'Trial#{trialNum} completed')
experiment.passes += 1
expRef.update({'passes': experiment.passes})
# expRef.update({'passes': experiment.passes})
update_exp_value(experiment.expId, 'passes', experiment.passes)
explogger.info("Finished running Trials")


def _handle_trial_error(experiment: ExperimentData, expRef, numOutputs: int, paramNames: "list", writer, trialNum: int, err: BaseException):
def _handle_trial_error(experiment: ExperimentData, numOutputs: int, paramNames: "list", writer, trialNum: int, err: BaseException):
csvErrorValue = None
if isinstance(err, TrialTimeoutError):
csvErrorValue = "TIMEOUT"
Expand All @@ -160,7 +164,8 @@ def _handle_trial_error(experiment: ExperimentData, expRef, numOutputs: int, par
explogger.error(f'Trial#{trialNum} Encountered an Error')
explogger.exception(err)
experiment.fails += 1
expRef.update({'fails': experiment.fails})
# expRef.update({'fails': experiment.fails})
update_exp_value(experiment.expId, 'fails', experiment.fails)
if trialNum == 0:
message = f"First trial of {experiment.expId} ran into an error while running, aborting the whole experiment. Read the traceback to find out what the actual cause of this problem is (it will not necessarily be at the top of the stack trace)."
raise ExperimentAbort(message) from err
Expand Down
121 changes: 100 additions & 21 deletions apps/runner/modules/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,45 +26,69 @@ def _get_env(key: str):
return value

def verify_mongo_connection():
"""Verify the connection to the mongo database
Raises:
DatabaseConnectionError: error that connection was unsuccessful
"""
url = f'http://glados-service-backend:{os.getenv("BACKEND_PORT")}/mongoPulse'
response = requests.get(url)
response = requests.get(url, timeout=10)
if response.ok:
return
else:
raise DatabaseConnectionError("MongoDB server not available/unreachable")



def upload_experiment_aggregated_results(experiment: ExperimentData, resultContent: str):
"""Upload experiment results to the database
Args:
experiment (ExperimentData): experiment data
resultContent (str): csv data
"""
# Call the backend
url = f'http://glados-service-backend:{os.getenv("BACKEND_PORT")}/uploadResults'
payload = {
"experimentId": experiment.expId,
"results": resultContent
}
_callBackend(url, payload, "inserted result csv into mongodb with id")
}
_call_backend(url, payload, "inserted result csv into mongodb with id")


def upload_experiment_zip(experiment: ExperimentData, encoded: Binary):
"""Upload experiment zip
Args:
experiment (ExperimentData): experiment data
encoded (Binary): encoded binary of zip for mongo
"""
# Call the backend
url = f'http://glados-service-backend:{os.getenv("BACKEND_PORT")}/uploadZip'
payload = {
"experimentId": experiment.expId,
"encoded": base64.b64encode(encoded).decode("utf-8")
}
_callBackend(url, payload, "inserted zip into mongodb with id")
}
_call_backend(url, payload, "inserted zip into mongodb with id")

def upload_experiment_log(experimentId: DocumentId):
filePath = get_filepath_for_experiment_log(experimentId)
get_system_logger().info('Uploading log file to the database: %s', filePath)
"""Upload the experiment log
Args:
experimentId (DocumentId): experiment data
Raises:
GladosInternalError: error raised
GladosInternalError: error raised
"""
file_path = get_filepath_for_experiment_log(experimentId)
get_system_logger().info('Uploading log file to the database: %s', file_path)

if len(explogger.handlers) != 0:
raise GladosInternalError("Experiment logger still has a handler open at upload time. Close it first.")

contents = None
try:
with open(filePath, 'r', encoding="utf8") as logFile:
contents = logFile.read()
with open(file_path, 'r', encoding="utf8") as log_file:
contents = log_file.read()
except Exception as err:
raise GladosInternalError(f"Failed to read log file for experiment {experimentId}") from err

Expand All @@ -73,18 +97,73 @@ def upload_experiment_log(experimentId: DocumentId):
payload = {
"experimentId": experimentId,
"logContents": contents
}
_callBackend(url, payload, "inserted log file into mongodb with id")
}
_call_backend(url, payload, "inserted log file into mongodb with id")

def get_experiment_with_id(experimentId: str):
"""Get the experiment data from the ID
Args:
experimentId (str): experiment id in mongo
def _callBackend(url, payload, logMsg):
Raises:
DatabaseConnectionError: couldn't connect to db
Returns:
json: json contents
"""
url = f'http://glados-service-backend:{os.getenv("BACKEND_PORT")}/getExperiment'
payload = {
"experimentId": experimentId
}
response = requests.post(url, json=payload, timeout=10)
if response.status_code == 200:
return response.json()['contents']
else:
raise DatabaseConnectionError("Error while getting experiment from backend!")

def update_exp_value(experimentId: str, field: str, newValue):
"""Update an experiment dynamically
Args:
experimentId (str): _description_
field (str): _description_
newValue (_type_): _description_
Raises:
DatabaseConnectionError: _description_
"""
url = f'http://glados-service-backend:{os.getenv("BACKEND_PORT")}/updateExperiment'
payload = {
"experimentId": experimentId,
"field": field,
"newValue": newValue
}
response = requests.post(url, json=payload)
if response.status_code == 200:
return
else:
raise DatabaseConnectionError("Error updating experiment document!")

def _call_backend(url, payload, log_msg):
"""calls the backend with provided args
Args:
url (str): backend url to be called
payload (json): payload to send to backend
logMsg (str): message to log
Raises:
DatabaseConnectionError: _description_
DatabaseConnectionError: _description_
"""
try:
response = requests.post(url, json=payload)
response = requests.post(url, json=payload, timeout=10)
if response.status_code == 200:
resultId = response.json().get('id')
if resultId:
explogger.info(f"{logMsg}: {resultId}")
result_id = response.json().get('_id')
if result_id:
explogger.info(f"{log_msg}: {result_id}")
else:
raise DatabaseConnectionError("Encountered error while writing document to MongoDB")
raise DatabaseConnectionError("Encountered error while contacting the backend!")
except Exception as err:
raise DatabaseConnectionError("Encountered error while writing document to MongoDB") from err

raise DatabaseConnectionError("Encountered error while contacting the backend!") from err
Loading

0 comments on commit c32af63

Please sign in to comment.