Skip to content

Commit

Permalink
Add check duplicate pbs jobs functionality;
Browse files Browse the repository at this point in the history
force pbs jobname to be same as the control expt directory name.
  • Loading branch information
minghangli-uni committed Sep 6, 2024
1 parent c8a3ac0 commit 90c46d1
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 54 deletions.
240 changes: 187 additions & 53 deletions expts_manager/Expts_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@
to set up control and perturbation experiments, modify configuration files,
and manage related utilities.
For more information, run `./Expts_manager.py -h`
Contact: Minghang Li <Minghang.Li1@anu.edu.au>
Dependencies:
argparse, glob, re, subprocess, numpy, git, f90nml, ruamel.yaml
Latest version: https://github.com/minghangli-uni/Expts_manager
Author: Minghang Li
License: Apache 2.0 License http://www.apache.org/licenses/LICENSE-2.0.txt
"""

# ===========================================================================
import os
import sys
import re
import copy
import subprocess
import shutil
import glob
import argparse
import warnings
Expand Down Expand Up @@ -138,6 +138,7 @@ def load_variables(self, yamlfile):

self.ctrl_nruns = self.indata.get("ctrl_nruns", 0)
self.run_namelists = self.indata.get("run_namelists", False)
self.check_duplicate_jobs = self.indata.get("check_duplicate_jobs", True)
self.check_skipping = self.indata.get("check_skipping", False)
self.force_restart = self.indata.get("force_restart", False)
self.startfrom = self.indata["startfrom"]
Expand All @@ -154,7 +155,7 @@ def _initialise_variables(self):
param_dict_change_list list[dict]: Specific for MOM_input, the list containing tunning parameter dictionaries.
commt_dict_change (dict): Specific for MOM_input, dictionary of comments for parameters.
append_group_list (list): Specific for f90nml, the list containing tunning parameters.
expt_names list[str]: Optional user-defined directory names for perturbation experiments.
expt_names list(str): Optional user-defined directory names for perturbation experiments.
"""
self.nml_ctrl = None
self.tag_model = None
Expand Down Expand Up @@ -261,8 +262,19 @@ def manage_ctrl_expt(self):
# Checks the current state of the repo, commits relevant changes.
self._check_and_commit_changes()

# check exisiting pbs jobs
pbs_jobs = self._output_existing_pbs_jobs()

# check duplicated running jobs
if self.check_duplicate_jobs:
duplicated_bool = self._check_duplicated_jobs(pbs_jobs, self.base_dir_name)
else:
duplicated_bool = False

# start control runs, count existing runs and do additional runs if needed
self._start_experiment_runs(base_path, self.base_dir_name)
self._start_experiment_runs(
base_path, self.base_dir_name, duplicated_bool, ctrl_nruns
)

def _clone_template_repo(self):
"""
Expand All @@ -278,18 +290,17 @@ def _extract_config_via_commit(self):
"""
templaterepo = git.Repo(self.base_path)
print(
f"Checking out commit {self.base_commit} and creating new branch {self.base_branch_name}!"
f"Check out commit {self.base_commit} and creat new branch {self.base_branch_name}!"
)
templaterepo.git.checkout(
"-b", self.base_branch_name, self.base_commit
) # checkout the new branch from the specific template commit
# checkout the new branch from the specific template commit
templaterepo.git.checkout("-b", self.base_branch_name, self.base_commit)

def _copy_diag_table(self, path):
"""
Copies the diagnostic table (`diag_table`) to the specified path if a path is defined.
"""
if self.diag_path:
command = f"scp {os.path.join(self.diag_path,"diag_table")} {path}"
command = f"scp {os.path.join(self.diag_path,'diag_table')} {path}"
subprocess.run(command, shell=True, check=False)
print(f"Copy diag_table to {path}")
else:
Expand Down Expand Up @@ -319,9 +330,8 @@ def _update_contrl_params(self):
namelist and MOM_input for the control experiment if needed.
"""
for file_name in os.listdir(self.base_path):
if file_name.endswith("_in") or file_name.endswith(
".nml"
): # Update parameters from namelists
# Update parameters from namelists
if file_name.endswith("_in") or file_name.endswith(".nml"):
yaml_data = self.indata.get(file_name, None)
if yaml_data:
if (
Expand All @@ -337,19 +347,16 @@ def _update_contrl_params(self):
yaml_data["dynamics_nml"]["cosw"] = cosw
yaml_data["dynamics_nml"]["sinw"] = sinw
del yaml_data["dynamics_nml"]["turning_angle"]
nml_ctrl = f90nml.read(
os.path.join(self.base_path, file_name)
) # read existing namelist file from the control experiment
self._update_config_entries(
nml_ctrl, yaml_data
) # update the namelist with the YAML input file
nml_ctrl.write(
os.path.join(self.base_path, file_name), force=True
) # write the updated namelist back to the file

if file_name in (
("nuopc.runconfig", "config.yaml")
): # Update config entries from `nuopc.runconfig` and `config_yaml`

# read existing namelist file from the control experiment
nml_ctrl = f90nml.read(os.path.join(self.base_path, file_name))
# update the namelist with the YAML input file
self._update_config_entries(nml_ctrl, yaml_data)
# write the updated namelist back to the file
nml_ctrl.write(os.path.join(self.base_path, file_name), force=True)

# Update config entries from `nuopc.runconfig` and `config_yaml`
if file_name in (("nuopc.runconfig", "config.yaml")):
yaml_data = self.indata.get(file_name, None)
if yaml_data:
tmp_file_path = os.path.join(self.base_path, file_name)
Expand All @@ -359,25 +366,32 @@ def _update_contrl_params(self):
self.write_nuopc_config(file_read, tmp_file_path)
elif file_name == "config.yaml":
file_read = self._read_ryaml(tmp_file_path)
yaml_data["jobname"] = self.base_dir_name
if yaml_data["jobname"] != self.base_dir_name:
raise ValueError(
f"jobname must be the same as {self.base_dir_name}!"
)
self._update_config_entries(file_read, yaml_data)
self._write_ryaml(file_read, tmp_file_path)

if file_name == "MOM_input": # Update parameters from `MOM_input`
# Update parameters from `MOM_input`
if file_name == "MOM_input":
yaml_data = self.indata.get(file_name, None)
if yaml_data:
# parse existing MOM_input
MOM_inputParser = self._parser_mom6_input(
os.path.join(self.base_path, file_name)
) # parse existing MOM_input
)
param_dict = MOM_inputParser.param_dict # read parameter dictionary
commt_dict = MOM_inputParser.commt_dict # read comment dictionary
param_dict.update(yaml_data)
# overwrite to the same `MOM_input`
MOM_inputParser.writefile_MOM_input(
os.path.join(self.base_path, file_name)
) # overwrite to the same `MOM_input`
)

if (
file_name == "nuopc.runseq"
): # Update only coupling timestep from `nuopc.runseq`
# Update only coupling timestep from `nuopc.runseq`
if file_name == "nuopc.runseq":
yaml_data = self.indata.get("cpl_dt", None)
if yaml_data:
nuopc_runseq_file = os.path.join(self.base_path, file_name)
Expand All @@ -391,16 +405,14 @@ def _check_and_commit_changes(self):
repo = git.Repo(self.base_path)
print(f"Current base branch is: {repo.active_branch.name}")
deleted_files = self._get_deleted_files(repo)
# remove deleted files or `work` directory
if deleted_files:
repo.index.remove(
deleted_files, r=True
) # remove deleted files or `work` directory
repo.index.remove(deleted_files, r=True)
untracked_files = self._get_untracked_files(repo)
changed_files = self._get_changed_files(repo)
staged_files = set(untracked_files + changed_files)
self._restore_swp_files(
repo, staged_files
) # restore *.swp files in case users open any files during case is are running
# restore *.swp files in case users open any files during case is are running
self._restore_swp_files(repo, staged_files)
commit_message = f"Control experiment setup: Configure `{self.base_branch_name}` branch by `{self.yamlfile}`\n committed files/directories {staged_files}!"
if staged_files:
repo.index.add(staged_files)
Expand Down Expand Up @@ -655,8 +667,19 @@ def setup_expts(self, parameter_block):
# clean `work` directory for failed jobs
self._clean_workspace(expt_path)

# check exisiting pbs jobs
pbs_jobs = self._output_existing_pbs_jobs()

# check duplicated running jobs
if self.check_duplicate_jobs:
duplicated_bool = self._check_duplicated_jobs(pbs_jobs, expt_name)
else:
duplicated_bool = False

# start runs, count existing runs and do additional runs if needed
self._start_experiment_runs(expt_path, expt_name)
self._start_experiment_runs(
expt_path, expt_name, duplicated_bool, self.nruns
)

self.expt_names = None # reset to None after the loop to update user-defined perturbation experiment names!

Expand Down Expand Up @@ -698,7 +721,7 @@ def _generate_expt_directory(self, expt_path, parameter_block, indx):

def _update_mom6_params(self, expt_path, param_dict):
"""
Updates MOM6 parameters in the `MOM_override` file.
Updates MOM6 parameters in the 'MOM_override' file.
Args:
expt_path (str): The path to the experiment directory.
Expand Down Expand Up @@ -842,19 +865,88 @@ def _clean_workspace(self, dir_path):
subprocess.run(command, shell=True, check=False)
print(f"Clean up a failed job {work_dir} and prepare it for resubmission.")

def _start_experiment_runs(self, expt_path, expt_name):
def _output_existing_pbs_jobs(self):
"""
Checks the existing qstat pbs information.
"""
current_job_status_path = os.path.join(self.dir_manager, "current_job_status")
command = f"qstat -f > {current_job_status_path}"
subprocess.run(command, shell=True, check=False)

pbs_jobs = {}
current_key = None
current_value = ""
job_id = None
with open(current_job_status_path, "r") as f:
pbs_job_file = f.read()

pbs_job_file = pbs_job_file.replace("\t", " ")

for line in pbs_job_file.splitlines():
line = line.rstrip()
if not line:
continue
if line.startswith("Job Id:"):
job_id = line.split(":", 1)[1].strip()
pbs_jobs[job_id] = {}
current_key = None
current_value = ""
elif line.startswith(" ") and current_key: # 8 indents multi-line
current_value += line.strip()
elif line.startswith(" ") and " = " in line: # 4 indents for new pair
# Save the previous multi-line value
if current_key:
pbs_jobs[job_id][current_key] = current_value.strip()
key, value = line.split(" = ", 1) # save key
current_key = key.strip()
current_value = value.strip()
return pbs_jobs

def _check_duplicated_jobs(self, pbs_jobs, expt_name):

def extract_current_and_parent_folder(tmp_path):
# extract base_name or expt_name from pbs jobs
current_folder = tmp_path.split("/")[-2]

# extract test_path from pbs jobs
parent_folder = tmp_path.split("/")[-3]

return current_folder, parent_folder

parent_folders = {}
for job_id, job_info in pbs_jobs.items():
current_folder, parent_folder = extract_current_and_parent_folder(
job_info["Error_Path"]
)
if parent_folder not in parent_folders:
parent_folders[parent_folder] = set()
parent_folders[parent_folder].add(current_folder)

duplicated = False
for parent_folder, folders in parent_folders.items():
if expt_name in folders:
if len(folders) > 1:
print(
f"You have duplicated runs for folder '{expt_name}' in the same folder '{parent_folder}', "
f"hence not submitting this job!\n"
)
duplicated = True
return duplicated

def _start_experiment_runs(self, expt_path, expt_name, duplicated, num_runs):
"""
Run perturbation experiments.
Runs perturbation experiments.
Args:
expt_path (str): The path to the control/perturbation experiment directory.
expt_name (str): The name of the control/perturbation experiment.
"""
if self.nruns > 0:

def runs():
doneruns = len(
glob.glob(os.path.join(expt_path, "archive", "output[0-9][0-9][0-9]*"))
)
newruns = self.nruns - doneruns
newruns = num_runs - doneruns
if newruns > 0:
print(f"\nRun experiment -n {newruns}\n")
command = f"cd {expt_path} && payu run -n {newruns} -f"
Expand All @@ -864,10 +956,54 @@ def _start_experiment_runs(self, expt_path, expt_name):
print(
f"-- `{expt_name}` has already completed {doneruns} runs! Hence, stopping further runs.\n"
)
else:

if not duplicated:
if num_runs > 0:
runs()
else:
print(
f"-- number of runs is {num_runs}, hence no new experiments will start!\n"
)

def _start_experiment_runs2(self, expt_path, expt_name, pbs_jobs, num_runs):
print(expt_name)
"""
Runs perturbation experiments.
Args:
expt_path (str): The path to the control/perturbation experiment directory.
expt_name (str): The name of the control/perturbation experiment.
"""

def runs():
doneruns = len(
glob.glob(os.path.join(expt_path, "archive", "output[0-9][0-9][0-9]*"))
)
newruns = num_runs - doneruns
if newruns > 0:
print(f"\nRun experiment -n {newruns}\n")
command = f"cd {expt_path} && payu run -n {newruns} -f"
subprocess.run(command, shell=True, check=False)
print("\n")
else:
print(
f"-- `{expt_name}` has already completed {doneruns} runs! Hence, stopping further runs.\n"
)

if pbs_jobs and any(
job_info["Error_Path"].split("/")[-2] == expt_name
for job_info in pbs_jobs.values()
):
print(
f"-- number of runs is {self.nruns}, hence no new experiments will start!\n"
f"-- `{expt_name}` is running now and hence skip rusubmitting the pbs job! \n"
)
else:
if num_runs > 0:
runs()
else:
print(
f"-- number of runs is {num_runs}, hence no new experiments will start!\n"
)

def _check_skipping(self, param_dict, nml_group, parameter_block, expt_path):
"""
Expand Down Expand Up @@ -925,11 +1061,9 @@ def _check_skipping(self, param_dict, nml_group, parameter_block, expt_path):
)
return

# might need MOM_parameter.all, because many parameters are in-default hence not shown up in `MOM_input`
if self.tag_model == "mom6":
# TODO
pass
if self.tag_model == "cpl_dt":
if (
self.tag_model == "mom6"
): # might need MOM_parameter.all, because many parameters are in-default hence not shown up in `MOM_input`
# TODO
pass

Expand Down
Loading

0 comments on commit 90c46d1

Please sign in to comment.