Skip to content

Commit

Permalink
Addressing more review comments: Update dicts in place
Browse files Browse the repository at this point in the history
  • Loading branch information
mkavulich committed Mar 15, 2023
1 parent 099a2d2 commit b88c4a7
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 43 deletions.
36 changes: 20 additions & 16 deletions tests/WE2E/WE2E_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,25 @@

from utils import calculate_core_hours, create_expts_dict, print_WE2E_summary, write_monitor_file

def WE2E_summary(args):
yaml_file = args.yaml_file

# Set up dictionary of experiments
if args.expt_dir:
yaml_file, expts_dict = create_expts_dict(args.expt_dir)
elif args.yaml_file:
expts_dict = load_config_file(args.yaml_file)
else:
raise ValueError(f'Bad arguments; run {__file__} -h for more information')

# Calculate core hours and update yaml
calculate_core_hours(expts_dict)
write_monitor_file(yaml_file,expts_dict)

#Call function to print summary
print_WE2E_summary(expts_dict, args.debug)


def setup_logging(debug: bool = False) -> None:
"""
Sets up logging, printing high-priority (INFO and higher) messages to screen, and printing all
Expand Down Expand Up @@ -53,19 +72,4 @@ def setup_logging(debug: bool = False) -> None:

setup_logging(args.debug)

yaml_file = args.yaml_file

# Set up dictionary of experiments
if args.expt_dir:
yaml_file, expts_dict = create_expts_dict(args.expt_dir)
elif args.yaml_file:
expts_dict = load_config_file(args.yaml_file)
else:
raise ValueError(f'Bad arguments; run {__file__} -h for more information')

# Calculate core hours and update yaml
expts_dict = calculate_core_hours(expts_dict)
write_monitor_file(yaml_file,expts_dict)

#Call function to print summary
print_WE2E_summary(expts_dict, args.debug)
WE2E_summary(args)
10 changes: 5 additions & 5 deletions tests/WE2E/monitor_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ def monitor_jobs(expts_dict: dict, monitor_file: str = '', procs: int = 1, debug

if procs > 1:
print(f'Starting experiments in parallel with {procs} processes')
expts_dict = update_expt_status_parallel(expts_dict, procs, True, debug)
update_expt_status_parallel(expts_dict, procs, True, debug)
else:
for expt in expts_dict:
logging.info(f"Starting experiment {expt} running")
expts_dict[expt] = update_expt_status(expts_dict[expt], expt, True, debug)
update_expt_status(expts_dict[expt], expt, True, debug)

write_monitor_file(monitor_file,expts_dict)

Expand All @@ -60,10 +60,10 @@ def monitor_jobs(expts_dict: dict, monitor_file: str = '', procs: int = 1, debug
while running_expts:
i += 1
if procs > 1:
expts_dict = update_expt_status_parallel(expts_dict, procs)
update_expt_status_parallel(expts_dict, procs)
else:
for expt in running_expts.copy():
expts_dict[expt] = update_expt_status(expts_dict[expt], expt)
update_expt_status(expts_dict[expt], expt)

for expt in running_expts.copy():
running_expts[expt] = expts_dict[expt]
Expand All @@ -89,7 +89,7 @@ def monitor_jobs(expts_dict: dict, monitor_file: str = '', procs: int = 1, debug
logging.info('Calculating core-hour usage and printing final summary')

# Calculate core hours and update yaml
expts_dict = calculate_core_hours(expts_dict)
calculate_core_hours(expts_dict)
write_monitor_file(monitor_file,expts_dict)

#Call function to print summary
Expand Down
33 changes: 11 additions & 22 deletions tests/WE2E/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
REPORT_WIDTH = 100
EXPT_COLUMN_WIDTH = 65
TASK_COLUMN_WIDTH = 40

def print_WE2E_summary(expts_dict: dict, debug: bool = False):
"""Function that creates a summary for the specified experiment
Expand Down Expand Up @@ -136,7 +137,7 @@ def create_expts_dict(expt_dir: str) -> dict:

return summary_file, expts_dict

def calculate_core_hours(expts_dict: dict) -> dict:
def calculate_core_hours(expts_dict: dict) -> None:
"""
Function takes in an experiment dictionary, reads the var_defns file for necessary information,
and calculates the core hours used by each task, updating expts_dict with this info
Expand All @@ -145,7 +146,7 @@ def calculate_core_hours(expts_dict: dict) -> dict:
expts_dict (dict): A dictionary containing the information needed to run
one or more experiments. See example file WE2E_tests.yaml
Returns:
dict : Experiments dictionary updated with core hours
None
"""

for expt in expts_dict:
Expand Down Expand Up @@ -180,7 +181,6 @@ def calculate_core_hours(expts_dict: dict) -> dict:
expts_dict[expt][task]['walltime'] / 3600
expts_dict[expt][task]['exact_count'] = False
expts_dict[expt][task]['core_hours'] = round(core_hours,2)
return expts_dict


def write_monitor_file(monitor_file: str, expts_dict: dict):
Expand All @@ -200,7 +200,7 @@ def write_monitor_file(monitor_file: str, expts_dict: dict):


def update_expt_status(expt: dict, name: str, refresh: bool = False, debug: bool = False,
submit: bool = True) -> dict:
submit: bool = True) -> None:
"""
This function reads the dictionary showing the location of a given experiment, runs a
`rocotorun` command to update the experiment (running new jobs and updating the status of
Expand Down Expand Up @@ -249,12 +249,12 @@ def update_expt_status(expt: dict, name: str, refresh: bool = False, debug: bool
workflow by calling rocotorun. If simply generating a report, set this
to False
Returns:
dict: The updated experiment dictionary.
None
"""

#If we are no longer tracking this experiment, return unchanged
if (expt["status"] in ['DEAD','ERROR','COMPLETE']) and not refresh:
return expt
return
# Update experiment, read rocoto database
rocoto_db = f"{expt['expt_dir']}/FV3LAM_wflow.db"
if submit:
Expand Down Expand Up @@ -293,8 +293,7 @@ def update_expt_status(expt: dict, name: str, refresh: bool = False, debug: bool
if not refresh:
logging.warning(f"Unable to read database {rocoto_db}\nCan not track experiment {name}")
expt["status"] = "ERROR"

return expt
return

for task in db:
# For each entry from rocoto database, store that task's info under a dictionary key named
Expand Down Expand Up @@ -322,7 +321,7 @@ def update_expt_status(expt: dict, name: str, refresh: bool = False, debug: bool
expt["status"] = "DYING"
else:
expt["status"] = "DEAD"
return expt
return
elif "RUNNING" in statuses:
expt["status"] = "RUNNING"
elif "QUEUED" in statuses:
Expand Down Expand Up @@ -364,10 +363,9 @@ def update_expt_status(expt: dict, name: str, refresh: bool = False, debug: bool
if expt["status"] in ["SUCCEEDED","STALLED","STUCK"]:
expt = compare_rocotostat(expt,name)

return expt

def update_expt_status_parallel(expts_dict: dict, procs: int, refresh: bool = False,
debug: bool = False) -> dict:
debug: bool = False) -> None:
"""
This function updates an entire set of experiments in parallel, drastically speeding up
the process if given enough parallel processes. Given an experiment dictionary, it will
Expand All @@ -385,7 +383,7 @@ def update_expt_status_parallel(expts_dict: dict, procs: int, refresh: bool = Fa
slow down the process drastically.
Returns:
dict: The updated dictionary of experiment dictionaries
None
"""

args = []
Expand All @@ -395,16 +393,7 @@ def update_expt_status_parallel(expts_dict: dict, procs: int, refresh: bool = Fa

# call update_expt_status() in parallel
with Pool(processes=procs) as pool:
output = pool.starmap(update_expt_status, args)

# Update dictionary with output from all calls to update_expt_status()
i = 0
for expt in expts_dict:
expts_dict[expt] = output[i]
i += 1

return expts_dict

pool.starmap(update_expt_status, args)


def print_test_info(txtfile: str = "WE2E_test_info.txt") -> None:
Expand Down

0 comments on commit b88c4a7

Please sign in to comment.