From e7e43d2a56652271cde0abbda0d2bab15ebe76c9 Mon Sep 17 00:00:00 2001 From: David Huber Date: Fri, 15 Nov 2024 08:07:30 -0600 Subject: [PATCH 01/11] First crack at archiving the expdir and git status --- parm/archive/expdir.yaml.j2 | 13 +++ parm/archive/master_gdas.yaml.j2 | 9 +- parm/archive/master_gefs.yaml.j2 | 7 ++ parm/archive/master_gfs.yaml.j2 | 7 ++ parm/config/gfs/config.base | 6 +- scripts/exglobal_archive.py | 25 +++--- sorc/wxflow | 2 +- ush/python/pygfs/task/archive.py | 138 ++++++++++++++++++++++++++++++- 8 files changed, 187 insertions(+), 20 deletions(-) create mode 100644 parm/archive/expdir.yaml.j2 diff --git a/parm/archive/expdir.yaml.j2 b/parm/archive/expdir.yaml.j2 new file mode 100644 index 0000000000..b0ebbcd296 --- /dev/null +++ b/parm/archive/expdir.yaml.j2 @@ -0,0 +1,13 @@ +{% set cycle_YMDH = current_cycle | to_YMDH %} + +expdir: + name: "EXPDIR" + changedir: "{{ EXPDIR }}" + target: "{{ ATARDIR }}/{{ cycle_YMDH }}/expdir.tar" + required: + - "{{ EXPDIR | relpath(EXPDIR) }}/config.*" + - "{{ EXPDIR | relpath(EXPDIR) }}/{{ pslot }}.db" + - "{{ EXPDIR | relpath(EXPDIR) }}/{{ pslot }}.xml" + {% if ARCH_HASHES or ARCH_DIFFS %} + - "{{ EXPDIR | relpath(EXPDIR) }}/git_info.log" + {% endif %} diff --git a/parm/archive/master_gdas.yaml.j2 b/parm/archive/master_gdas.yaml.j2 index 11e83d387b..b3d6560012 100644 --- a/parm/archive/master_gdas.yaml.j2 +++ b/parm/archive/master_gdas.yaml.j2 @@ -40,7 +40,7 @@ datasets: # Determine if we will save restart ICs or not (only valid for cycled) {% set save_warm_start_forecast, save_warm_start_cycled = ( False, False ) %} - {% if ARCH_CYC == cycle_HH | int%} + {% if ARCH_CYC == cycle_HH | int %} # Save the forecast-only cycle ICs every ARCH_WARMICFREQ or ARCH_FCSTICFREQ days {% if (current_cycle - SDATE).days % ARCH_WARMICFREQ == 0 %} {% set save_warm_start_forecast = True %} @@ -97,3 +97,10 @@ datasets: # End of restart checking {% endif %} + +# Archive the EXPDIR if requested +{% if archive_expdir %} +{% filter indent(width=4) %} +{% include "expdir.yaml.j2" %} +{% endfilter %} +{% endif %} diff --git a/parm/archive/master_gefs.yaml.j2 b/parm/archive/master_gefs.yaml.j2 index 5dc046dcfd..e76d7c9f7a 100644 --- a/parm/archive/master_gefs.yaml.j2 +++ b/parm/archive/master_gefs.yaml.j2 @@ -10,3 +10,10 @@ datasets: {% include "gefs_extracted_ice.yaml.j2" %} {% include "gefs_extracted_wave.yaml.j2" %} {% endfilter %} + +# Archive the EXPDIR if requested +{% if archive_expdir %} +{% filter indent(width=4) %} +{% include "expdir.yaml.j2" %} +{% endfilter %} +{% endif %} diff --git a/parm/archive/master_gfs.yaml.j2 b/parm/archive/master_gfs.yaml.j2 index ab9a00c95e..fa77d29c14 100644 --- a/parm/archive/master_gfs.yaml.j2 +++ b/parm/archive/master_gfs.yaml.j2 @@ -98,3 +98,10 @@ datasets: {% endfilter %} {% endif %} {% endif %} + +# Archive the EXPDIR if requested +{% if archive_expdir %} +{% filter indent(width=4) %} +{% include "expdir.yaml.j2" %} +{% endfilter %} +{% endif %} diff --git a/parm/config/gfs/config.base b/parm/config/gfs/config.base index ccb05abe88..92e0b135b7 100644 --- a/parm/config/gfs/config.base +++ b/parm/config/gfs/config.base @@ -464,9 +464,13 @@ if [[ ${HPSSARCH} = "YES" ]] && [[ ${LOCALARCH} = "YES" ]]; then echo "Both HPSS and local archiving selected. Please choose one or the other." exit 2 fi -export ARCH_CYC=00 # Archive data at this cycle for warm_start capability +export ARCH_WARMICCYC=00 # Archive data at this cycle for warm_start capability export ARCH_WARMICFREQ=4 # Archive frequency in days for warm_start capability export ARCH_FCSTICFREQ=1 # Archive frequency in days for gdas and gfs forecast-only capability +export ARCH_EXPDIR='YES' # Archive the EXPDIR configs, XML, and database +export ARCH_EXPDIR_FREQ=0 # How often to archive the EXPDIR in hours or 0 for first and last cycle only +export ARCH_HASHES='YES' # Archive the hashes of the GW and submodules and 'git status' for each +export ARCH_DIFFS='NO' # Archive the output of 'git diff' for the GW; note git must be newer than v2.14 # The monitor jobs are not yet supported for JEDIATMVAR. if [[ ${DO_JEDIATMVAR} = "YES" ]]; then diff --git a/scripts/exglobal_archive.py b/scripts/exglobal_archive.py index 4ee9e5ed0e..0ecd4db327 100755 --- a/scripts/exglobal_archive.py +++ b/scripts/exglobal_archive.py @@ -3,7 +3,7 @@ import os from pygfs.task.archive import Archive -from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, chdir, logit +from wxflow import AttrDict, Logger, cast_strdict_as_dtypedict, logit, chdir # initialize root logger logger = Logger(level=os.environ.get("LOGGING_LEVEL", "DEBUG"), colored_log=True) @@ -32,7 +32,8 @@ def main(): 'AERO_ANL_RUN', 'AERO_FCST_RUN', 'DOIBP_WAV', 'DO_JEDIOCNVAR', 'NMEM_ENS', 'DO_JEDIATMVAR', 'DO_VRFY_OCEANDA', 'FHMAX_FITS', 'waveGRD', 'IAUFHRS', 'DO_FIT2OBS', 'NET', 'FHOUT_HF_GFS', 'FHMAX_HF_GFS', 'REPLAY_ICS', - 'OFFSET_START_HOUR'] + 'OFFSET_START_HOUR', 'ARCH_EXPDIR', 'EXPDIR', 'ARCH_EXPDIR_FREQ', 'ARCH_HASHES', + 'ARCH_DIFFS', 'SDATE', 'EDATE', 'HOMEgfs'] archive_dict = AttrDict() for key in keys: @@ -47,21 +48,17 @@ def main(): if archive_dict[key] is None: print(f"Warning: key ({key}) not found in task_config!") - cwd = os.getcwd() + with chdir(config.ROTDIR): - os.chdir(config.ROTDIR) + # Determine which archives to create + arcdir_set, atardir_sets = archive.configure(archive_dict) - # Determine which archives to create - arcdir_set, atardir_sets = archive.configure(archive_dict) + # Populate the product archive (ARCDIR) + archive.execute_store_products(arcdir_set) - # Populate the product archive (ARCDIR) - archive.execute_store_products(arcdir_set) - - # Create the backup tarballs and store in ATARDIR - for atardir_set in atardir_sets: - archive.execute_backup_dataset(atardir_set) - - os.chdir(cwd) + # Create the backup tarballs and store in ATARDIR + for atardir_set in atardir_sets: + archive.execute_backup_dataset(atardir_set) if __name__ == '__main__': diff --git a/sorc/wxflow b/sorc/wxflow index e1ef697430..a7b49e9cc7 160000 --- a/sorc/wxflow +++ b/sorc/wxflow @@ -1 +1 @@ -Subproject commit e1ef697430c09d2b1a0560f21f11c7a32ed5f3e2 +Subproject commit a7b49e9cc76ef4b50cc1c28d4b7959ebde99c5f5 diff --git a/ush/python/pygfs/task/archive.py b/ush/python/pygfs/task/archive.py index 108cd2ed27..1c9d6cd78f 100644 --- a/ush/python/pygfs/task/archive.py +++ b/ush/python/pygfs/task/archive.py @@ -9,8 +9,9 @@ from wxflow import (AttrDict, FileHandler, Hsi, Htar, Task, chgrp, get_gid, logit, mkdir_p, parse_j2yaml, rm_p, strftime, - to_YMDH) + to_YMDH, which, chdir, ProcessError) +git_filename = "git_info.log" logger = getLogger(__name__.split('.')[-1]) @@ -109,6 +110,14 @@ def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str self.tar_cmd = "" return arcdir_set, [] + # Determine if we are archiving the EXPDIR this cycle + arch_dict.archive_expdir = False + if arch_dict.ARCH_EXPDIR: + arch_dict.archive_expdir = Archive._archive_expdir(arch_dict) + # If requested, get workflow git hashes/statuses/diffs for EXPDIR archiving + if arch_dict.archive_expdir and (arch_dict.ARCH_HASHES or arch_dict.ARCH_DIFFS): + Archive._pop_git_info(arch_dict) + master_yaml = "master_" + arch_dict.RUN + ".yaml.j2" parsed_sets = parse_j2yaml(os.path.join(archive_parm, master_yaml), @@ -244,7 +253,7 @@ def _has_rstprod(fileset: List) -> bool: return False @logit(logger) - def _protect_rstprod(self, atardir_set: Dict[str, any]) -> None: + def _protect_rstprod(self, atardir_set: Dict[str, Any]) -> None: """ Changes the group of the target tarball to rstprod and the permissions to 640. If this fails for any reason, attempt to delete the file before exiting. @@ -289,7 +298,7 @@ def _create_tarball(target: str, fileset: List) -> None: tarball.add(filename) @logit(logger) - def _gen_relative_paths(self, root_path: str) -> Dict: + def _gen_relative_paths(self, root_path: str) -> Dict[str, Any]: """Generate a dict of paths in self.task_config relative to root_path Parameters @@ -417,3 +426,126 @@ def replace_string_from_to_file(filename_in, filename_out, search_str, replace_s replace_string_from_to_file(in_track_p_file, out_track_p_file, "AVNO", pslot4) return + + @staticmethod + @logit(logger) + def _archive_expdir(arch_dict: Dict[str, Any]) -> Dict[str, Any]: + """ + This function checks if the EXPDIR should be archived this RUN/cycle + + Parameters + ---------- + arch_dict: Dict + Dictionary with required parameters, including the following: + + current_cycle: Datetime + Date of the current cycle. + SDATE: Datetime + Starting cycle date. + EDATE: Datetime + Ending cycle date. + NET: str + The workflow type (gfs or gefs) + ARCH_EXPDIR_FREQ: int + Frequency to perform EXPDIR archiving + """ + + # Get commonly used variables + current_cycle = arch_dict.current_cycle + sdate = arch_dict.SDATE + edate = arch_dict.EDATE + # Convert frequency to seconds from hours + freq = arch_dict.ARCH_EXPDIR_FREQ * 3600 + + # Skip gfs and enkf cycled RUNs (only archive during gdas RUNs) + # (do not skip forecast-only, regardless of RUN) + if arch_dict.NET == "gfs" and arch_dict.MODE == "cycled" and arch_dict.RUN != "gdas": + return False + + # Determine if we should skip this cycle + # If the frequency is set to 0, only run on sdate and edate + if freq == 0: + if current_cycle != sdate or current_cycle != edate: + return False + # Otherwise, the frequency is in hours + elif (sdate - current_cycle).total_seconds() % freq != 0: + return False + + # Looks like we are archiving the EXPDIR + return True + + @staticmethod + @logit(logger) + def _pop_git_info(arch_dict: Dict[str, Any]) -> Dict[str, Any]: + """ + This function checks the configuration options ARCH_HASHES and ARCH_DIFFS + and ARCH_EXPDIR_FREQ to determine if the git hashes and/or diffs should be + added to the EXPDIR for archiving and execute the commands. The hashes and + diffs will be stored in EXPDIR/git_info.log. + + Parameters + ---------- + arch_dict: Dict + Dictionary with required parameters, including the following: + + EXPDIR: str + Location of the EXPDIR + HOMEgfs: str + Location of the HOMEgfs (the global workflow) + ARCH_HASHES: bool + Whether to archive git hashes of the workflow and submodules + ARCH_DIFFS: bool + Whether to archive git diffs of the workflow and submodules + """ + + # Get commonly used variables + arch_hashes = arch_dict.ARCH_HASHES + arch_diffs = arch_dict.ARCH_DIFFS + homegfs = arch_dict.HOMEgfs + expdir = arch_dict.EXPDIR + + # Find the git command + git = which('git') + if git is None: + raise FileNotFoundError("FATAL ERROR: the git command could not be found!") + + output = "" + # Navigate to HOMEgfs to run the git commands + with chdir(homegfs): + + # Are we running git to get hashes? + if arch_hashes: + output += "Global workflow hash:\n" + + try: + output += git("rev-parse", "HEAD", output=str) + output += "\nSubmodule hashes:\n" + output += git("submodule", "status", output=str) + except ProcessError: + raise OSError("FATAL ERROR Failed to run git") + + # Are we running git to get diffs? + if arch_diffs: + output += "Global workflow diffs:\n" + # This command will only work on git v2.14+ + try: + output += git("diff", "--submodule=diff", output=str) + except ProcessError: + # The version of git may be too old. See if we can run just a surface diff. + try: + output += git("diff", output=str) + print("WARNING git was unable to do a recursive diff.\n" + "Only a top level diff was performed.\n" + "Note that the git version must be >= 2.14 for this feature.") + except ProcessError: + raise OSError("FATAL ERROR Failed to run 'git diff'") + + # Write out to the log file + try: + with open(os.path.join(expdir, git_filename), 'w') as output_file: + output_file.write(output) + except OSError: + fname = os.path.join(expdir, git_filename) + raise OSError(f"FATAL ERROR Unable to write git output to '{fname}'") + + return From 2746e46202d155a301d0ec18dfd6bf10f83e92a5 Mon Sep 17 00:00:00 2001 From: David Huber Date: Mon, 18 Nov 2024 09:45:32 -0600 Subject: [PATCH 02/11] Add flake8 rules for the global workflow --- .flake8 | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .flake8 diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000000..35bff9e2f4 --- /dev/null +++ b/.flake8 @@ -0,0 +1,6 @@ +[flake8] +exclude = .git,.github,venv,__pycache__,docs/conf.py,old,build,dist +max-line-length = 160 +per-file-ignores = + # imported but unused + __init__.py: F401 From 157984476497e299949dd4f080c2ce8f838fdd86 Mon Sep 17 00:00:00 2001 From: David Huber Date: Mon, 18 Nov 2024 10:10:03 -0600 Subject: [PATCH 03/11] Copy expdir to ROTDIR before tar'ing --- parm/archive/expdir.yaml.j2 | 22 ++++++++++--- parm/config/gefs/config.base | 8 +++-- parm/config/gfs/config.base | 8 ++--- scripts/exglobal_archive.py | 3 ++ ush/python/pygfs/task/archive.py | 55 +++++++++++++++++++++++++------- 5 files changed, 73 insertions(+), 23 deletions(-) diff --git a/parm/archive/expdir.yaml.j2 b/parm/archive/expdir.yaml.j2 index b0ebbcd296..2ca358c0ad 100644 --- a/parm/archive/expdir.yaml.j2 +++ b/parm/archive/expdir.yaml.j2 @@ -2,12 +2,24 @@ expdir: name: "EXPDIR" - changedir: "{{ EXPDIR }}" + # Copy the experiment files from the EXPDIR into the ROTDIR for archiving + FileHandler: + mkdir: + - "{{ temp_expdir }}" + copy: + {% for config in glob(EXPDIR ~ "/config.*") %} + - [ "{{ config }}", "{{ temp_expdir }}/." ] + {% endfor %} + - [ "{{ EXPDIR }}/{{ PSLOT }}.db", "{{ temp_expdir }}/." ] + - [ "{{ EXPDIR }}/{{ PSLOT }}.xml", "{{ temp_expdir }}/." ] + {% if ARCH_HASHES or ARCH_DIFFS %} + - [ "{{ EXPDIR }}/git_info.log", "{{ temp_expdir }}/." ] + {% endif %} target: "{{ ATARDIR }}/{{ cycle_YMDH }}/expdir.tar" required: - - "{{ EXPDIR | relpath(EXPDIR) }}/config.*" - - "{{ EXPDIR | relpath(EXPDIR) }}/{{ pslot }}.db" - - "{{ EXPDIR | relpath(EXPDIR) }}/{{ pslot }}.xml" + - "expdir/config.*" + - "expdir/{{ PSLOT }}.db" + - "expdir/{{ PSLOT }}.xml" {% if ARCH_HASHES or ARCH_DIFFS %} - - "{{ EXPDIR | relpath(EXPDIR) }}/git_info.log" + - "expdir/git_info.log" {% endif %} diff --git a/parm/config/gefs/config.base b/parm/config/gefs/config.base index 13f286c494..477fa8cd2e 100644 --- a/parm/config/gefs/config.base +++ b/parm/config/gefs/config.base @@ -329,9 +329,13 @@ if [[ ${HPSSARCH} = "YES" ]] && [[ ${LOCALARCH} = "YES" ]]; then echo "Both HPSS and local archiving selected. Please choose one or the other." exit 3 fi -export ARCH_CYC=00 # Archive data at this cycle for warm_start capability -export ARCH_WARMICFREQ=4 # Archive frequency in days for warm_start capability +export ARCH_CYC=00 # Archive data at this cycle for warm start and/or forecast-only capabilities +export ARCH_WARMICFREQ=4 # Archive frequency in days for warm start capability export ARCH_FCSTICFREQ=1 # Archive frequency in days for gdas and gfs forecast-only capability +export ARCH_EXPDIR='YES' # Archive the EXPDIR configs, XML, and database +export ARCH_EXPDIR_FREQ=0 # How often to archive the EXPDIR in hours or 0 for first and last cycle only +export ARCH_HASHES='YES' # Archive the hashes of the GW and submodules and 'git status' for each; requires ARCH_EXPDIR +export ARCH_DIFFS='NO' # Archive the output of 'git diff' for the GW; requires ARCH_EXPDIR export DELETE_COM_IN_ARCHIVE_JOB="YES" # NO=retain ROTDIR. YES default in arch.sh and earc.sh. diff --git a/parm/config/gfs/config.base b/parm/config/gfs/config.base index cb6b087705..558ec46727 100644 --- a/parm/config/gfs/config.base +++ b/parm/config/gfs/config.base @@ -464,13 +464,13 @@ if [[ ${HPSSARCH} = "YES" ]] && [[ ${LOCALARCH} = "YES" ]]; then echo "Both HPSS and local archiving selected. Please choose one or the other." exit 3 fi -export ARCH_WARMICCYC=00 # Archive data at this cycle for warm_start capability -export ARCH_WARMICFREQ=4 # Archive frequency in days for warm_start capability +export ARCH_CYC=00 # Archive data at this cycle for warm start and/or forecast-only capabilities +export ARCH_WARMICFREQ=4 # Archive frequency in days for warm start capability export ARCH_FCSTICFREQ=1 # Archive frequency in days for gdas and gfs forecast-only capability export ARCH_EXPDIR='YES' # Archive the EXPDIR configs, XML, and database export ARCH_EXPDIR_FREQ=0 # How often to archive the EXPDIR in hours or 0 for first and last cycle only -export ARCH_HASHES='YES' # Archive the hashes of the GW and submodules and 'git status' for each -export ARCH_DIFFS='NO' # Archive the output of 'git diff' for the GW; note git must be newer than v2.14 +export ARCH_HASHES='YES' # Archive the hashes of the GW and submodules and 'git status' for each; requires ARCH_EXPDIR +export ARCH_DIFFS='NO' # Archive the output of 'git diff' for the GW; requires ARCH_EXPDIR # The monitor jobs are not yet supported for JEDIATMVAR. if [[ ${DO_JEDIATMVAR} = "YES" ]]; then diff --git a/scripts/exglobal_archive.py b/scripts/exglobal_archive.py index 0ecd4db327..f36fa8477c 100755 --- a/scripts/exglobal_archive.py +++ b/scripts/exglobal_archive.py @@ -60,6 +60,9 @@ def main(): for atardir_set in atardir_sets: archive.execute_backup_dataset(atardir_set) + # Clean up any temporary files + archive.clean() + if __name__ == '__main__': main() diff --git a/ush/python/pygfs/task/archive.py b/ush/python/pygfs/task/archive.py index 1c9d6cd78f..e133752f4f 100644 --- a/ush/python/pygfs/task/archive.py +++ b/ush/python/pygfs/task/archive.py @@ -8,8 +8,8 @@ from typing import Any, Dict, List from wxflow import (AttrDict, FileHandler, Hsi, Htar, Task, - chgrp, get_gid, logit, mkdir_p, parse_j2yaml, rm_p, strftime, - to_YMDH, which, chdir, ProcessError) + chgrp, get_gid, logit, mkdir_p, parse_j2yaml, rm_p, rmdir, + strftime, to_YMDH, which, chdir, ProcessError) git_filename = "git_info.log" logger = getLogger(__name__.split('.')[-1]) @@ -44,6 +44,9 @@ def __init__(self, config: Dict[str, Any]) -> None: # Extend task_config with path_dict self.task_config = AttrDict(**self.task_config, **path_dict) + # Boolean used for cleanup if the EXPDIR was archived + self.archive_expdir = False + @logit(logger) def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str, Any]]): """Determine which tarballs will need to be created. @@ -111,12 +114,16 @@ def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str return arcdir_set, [] # Determine if we are archiving the EXPDIR this cycle - arch_dict.archive_expdir = False + self.temp_expdir = "" if arch_dict.ARCH_EXPDIR: - arch_dict.archive_expdir = Archive._archive_expdir(arch_dict) - # If requested, get workflow git hashes/statuses/diffs for EXPDIR archiving - if arch_dict.archive_expdir and (arch_dict.ARCH_HASHES or arch_dict.ARCH_DIFFS): - Archive._pop_git_info(arch_dict) + self.archive_expdir, self.temp_expdir = Archive._archive_expdir(arch_dict) + arch_dict.temp_expdir = self.temp_expdir + arch_dict.archive_expdir = self.archive_expdir + + if self.archive_expdir: + # If requested, get workflow hashes/statuses/diffs for EXPDIR archiving + if arch_dict.ARCH_HASHES or arch_dict.ARCH_DIFFS: + Archive._pop_git_info(arch_dict) master_yaml = "master_" + arch_dict.RUN + ".yaml.j2" @@ -204,6 +211,12 @@ def _create_fileset(atardir_set: Dict[str, Any]) -> List: """ fileset = [] + # Check if any external files need to be brought into the ROTDIR (i.e. EXPDIR contents) + if "FileHandler" in atardir_set: + # Run the file handler to stage files for archiving + FileHandler(atardir_set["FileHandler"]).sync() + + # Check that all required files are present and add them to the list of files to archive if "required" in atardir_set: if atardir_set.required is not None: for item in atardir_set.required: @@ -213,6 +226,7 @@ def _create_fileset(atardir_set: Dict[str, Any]) -> List: for entry in glob_set: fileset.append(entry) + # Check for optional files and add found items to the list of files to archive if "optional" in atardir_set: if atardir_set.optional is not None: for item in atardir_set.optional: @@ -429,9 +443,11 @@ def replace_string_from_to_file(filename_in, filename_out, search_str, replace_s @staticmethod @logit(logger) - def _archive_expdir(arch_dict: Dict[str, Any]) -> Dict[str, Any]: + def _archive_expdir(arch_dict: Dict[str, Any]) -> (bool, str): """ This function checks if the EXPDIR should be archived this RUN/cycle + and returns the temporary path in the ROTDIR where the EXPDIR will be + copied to for archiving. Parameters ---------- @@ -448,6 +464,8 @@ def _archive_expdir(arch_dict: Dict[str, Any]) -> Dict[str, Any]: The workflow type (gfs or gefs) ARCH_EXPDIR_FREQ: int Frequency to perform EXPDIR archiving + ROTDIR: str + Full path to the ROTDIR """ # Get commonly used variables @@ -460,19 +478,19 @@ def _archive_expdir(arch_dict: Dict[str, Any]) -> Dict[str, Any]: # Skip gfs and enkf cycled RUNs (only archive during gdas RUNs) # (do not skip forecast-only, regardless of RUN) if arch_dict.NET == "gfs" and arch_dict.MODE == "cycled" and arch_dict.RUN != "gdas": - return False + return False, "" # Determine if we should skip this cycle # If the frequency is set to 0, only run on sdate and edate if freq == 0: if current_cycle != sdate or current_cycle != edate: - return False + return False, "" # Otherwise, the frequency is in hours elif (sdate - current_cycle).total_seconds() % freq != 0: - return False + return False, "" # Looks like we are archiving the EXPDIR - return True + return True, os.path.join(arch_dict.ROTDIR, "expdir") @staticmethod @logit(logger) @@ -549,3 +567,16 @@ def _pop_git_info(arch_dict: Dict[str, Any]) -> Dict[str, Any]: raise OSError(f"FATAL ERROR Unable to write git output to '{fname}'") return + + @logit(logger) + def clean(self): + """ + Remove the temporary directories/files created by the Archive task. + Presently, this is only the ROTDIR/expdir directory if EXPDIR archiving + was performed. + """ + + if self.archive_expdir: + rmdir(self.temp_expdir) + + return From 23b0454a41a79a2207a37f7177ef61b1f2b32f35 Mon Sep 17 00:00:00 2001 From: David Huber Date: Tue, 19 Nov 2024 14:25:00 -0600 Subject: [PATCH 04/11] Allow users to specify their HPC account --- workflow/generate_workflows.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/workflow/generate_workflows.sh b/workflow/generate_workflows.sh index 6a4cb9910a..55d3a24d96 100755 --- a/workflow/generate_workflows.sh +++ b/workflow/generate_workflows.sh @@ -140,6 +140,7 @@ while [[ $# -gt 0 && "$1" != "--" ]]; do G) _run_all_gfs=true ;; E) _run_all_gefs=true ;; S) _run_all_sfs=true ;; + A) _set_account=true && _hpc_account="${OPTARG}" ;; c) _update_cron=true ;; e) _email="${OPTARG}" && _set_email=true ;; t) _tag="_${OPTARG}" ;; From f88c07775f2dcdc479480258cf594e2352eb4266 Mon Sep 17 00:00:00 2001 From: David Huber Date: Tue, 19 Nov 2024 14:27:36 -0600 Subject: [PATCH 05/11] Start archiving EXPDIR on the first full cycle and name expdir directories to prevent cleaning other jobs --- parm/archive/expdir.yaml.j2 | 19 ++++++++++--------- ush/python/pygfs/task/archive.py | 32 +++++++++++++++++++------------- 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/parm/archive/expdir.yaml.j2 b/parm/archive/expdir.yaml.j2 index 2ca358c0ad..b9b07d278c 100644 --- a/parm/archive/expdir.yaml.j2 +++ b/parm/archive/expdir.yaml.j2 @@ -3,23 +3,24 @@ expdir: name: "EXPDIR" # Copy the experiment files from the EXPDIR into the ROTDIR for archiving + {% set copy_expdir = "expdir." ~ cycle_YMDH %} FileHandler: mkdir: - - "{{ temp_expdir }}" + - "{{ ROTDIR }}/{{ copy_expdir }}" copy: {% for config in glob(EXPDIR ~ "/config.*") %} - - [ "{{ config }}", "{{ temp_expdir }}/." ] + - [ "{{ config }}", "{{ ROTDIR }}/{{ copy_expdir }}/." ] {% endfor %} - - [ "{{ EXPDIR }}/{{ PSLOT }}.db", "{{ temp_expdir }}/." ] - - [ "{{ EXPDIR }}/{{ PSLOT }}.xml", "{{ temp_expdir }}/." ] + - [ "{{ EXPDIR }}/{{ PSLOT }}.db", "{{ ROTDIR }}/{{ copy_expdir }}/." ] + - [ "{{ EXPDIR }}/{{ PSLOT }}.xml", "{{ ROTDIR }}/{{ copy_expdir }}/." ] {% if ARCH_HASHES or ARCH_DIFFS %} - - [ "{{ EXPDIR }}/git_info.log", "{{ temp_expdir }}/." ] + - [ "{{ EXPDIR }}/git_info.log", "{{ ROTDIR }}/{{ copy_expdir }}/." ] {% endif %} target: "{{ ATARDIR }}/{{ cycle_YMDH }}/expdir.tar" required: - - "expdir/config.*" - - "expdir/{{ PSLOT }}.db" - - "expdir/{{ PSLOT }}.xml" + - "{{ copy_expdir }}/config.*" + - "{{ copy_expdir }}/{{ PSLOT }}.db" + - "{{ copy_expdir }}/{{ PSLOT }}.xml" {% if ARCH_HASHES or ARCH_DIFFS %} - - "expdir/git_info.log" + - "{{ copy_expdir }}/git_info.log" {% endif %} diff --git a/ush/python/pygfs/task/archive.py b/ush/python/pygfs/task/archive.py index e133752f4f..e2972716a0 100644 --- a/ush/python/pygfs/task/archive.py +++ b/ush/python/pygfs/task/archive.py @@ -7,7 +7,7 @@ from logging import getLogger from typing import Any, Dict, List -from wxflow import (AttrDict, FileHandler, Hsi, Htar, Task, +from wxflow import (AttrDict, FileHandler, Hsi, Htar, Task, to_timedelta, chgrp, get_gid, logit, mkdir_p, parse_j2yaml, rm_p, rmdir, strftime, to_YMDH, which, chdir, ProcessError) @@ -114,10 +114,8 @@ def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str return arcdir_set, [] # Determine if we are archiving the EXPDIR this cycle - self.temp_expdir = "" if arch_dict.ARCH_EXPDIR: - self.archive_expdir, self.temp_expdir = Archive._archive_expdir(arch_dict) - arch_dict.temp_expdir = self.temp_expdir + self.archive_expdir = Archive._archive_expdir(arch_dict) arch_dict.archive_expdir = self.archive_expdir if self.archive_expdir: @@ -443,7 +441,7 @@ def replace_string_from_to_file(filename_in, filename_out, search_str, replace_s @staticmethod @logit(logger) - def _archive_expdir(arch_dict: Dict[str, Any]) -> (bool, str): + def _archive_expdir(arch_dict: Dict[str, Any]) -> bool: """ This function checks if the EXPDIR should be archived this RUN/cycle and returns the temporary path in the ROTDIR where the EXPDIR will be @@ -472,25 +470,31 @@ def _archive_expdir(arch_dict: Dict[str, Any]) -> (bool, str): current_cycle = arch_dict.current_cycle sdate = arch_dict.SDATE edate = arch_dict.EDATE + mode = arch_dict.MODE + assim_freq = to_timedelta(f"+{arch_dict.assim_freq}H") # Convert frequency to seconds from hours freq = arch_dict.ARCH_EXPDIR_FREQ * 3600 # Skip gfs and enkf cycled RUNs (only archive during gdas RUNs) # (do not skip forecast-only, regardless of RUN) if arch_dict.NET == "gfs" and arch_dict.MODE == "cycled" and arch_dict.RUN != "gdas": - return False, "" + return False # Determine if we should skip this cycle - # If the frequency is set to 0, only run on sdate and edate + # If the frequency is set to 0, only run on sdate (+assim_freq for cycled) and edate if freq == 0: - if current_cycle != sdate or current_cycle != edate: - return False, "" + if mode == "forecast-only" and (current_cycle != sdate or current_cycle != edate): + return False + elif mode == "cycled" and (current_cycle != sdate + assim_freq or current_cycle != edate): + return False # Otherwise, the frequency is in hours - elif (sdate - current_cycle).total_seconds() % freq != 0: - return False, "" + elif mode == "forecast-only" and (sdate - current_cycle).total_seconds() % freq != 0: + return False + elif mode == "cycled" and (sdate + assim_freq - current_cycle).total_seconds() % freq != 0: + return False # Looks like we are archiving the EXPDIR - return True, os.path.join(arch_dict.ROTDIR, "expdir") + return True @staticmethod @logit(logger) @@ -577,6 +581,8 @@ def clean(self): """ if self.archive_expdir: - rmdir(self.temp_expdir) + temp_expdir_path = os.path.join(self.task_config.ROTDIR, "expdir." + + to_YMDH(self.task_config.current_cycle)) + rmdir(temp_expdir_path) return From 7d0b70ac6d8c8b32218678ba3ccb16a2ac6d5bde Mon Sep 17 00:00:00 2001 From: David Huber <69919478+DavidHuber-NOAA@users.noreply.github.com> Date: Tue, 19 Nov 2024 20:36:12 +0000 Subject: [PATCH 06/11] Trimmed .flake8 exclusions This .flake8 file was copied from wxflow. --- .flake8 | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/.flake8 b/.flake8 index 35bff9e2f4..e3a4e3e47e 100644 --- a/.flake8 +++ b/.flake8 @@ -1,6 +1,3 @@ [flake8] -exclude = .git,.github,venv,__pycache__,docs/conf.py,old,build,dist +exclude = .git,.github,venv,__pycache__,old,build,dist max-line-length = 160 -per-file-ignores = - # imported but unused - __init__.py: F401 From 12b6f955226eb22e8350fdc15a16d7af91c0453a Mon Sep 17 00:00:00 2001 From: David Huber Date: Mon, 2 Dec 2024 13:22:43 +0000 Subject: [PATCH 07/11] Remove extra A option --- workflow/generate_workflows.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/workflow/generate_workflows.sh b/workflow/generate_workflows.sh index 05b61b39ad..c98fa3028a 100755 --- a/workflow/generate_workflows.sh +++ b/workflow/generate_workflows.sh @@ -140,7 +140,6 @@ while [[ $# -gt 0 && "$1" != "--" ]]; do G) _run_all_gfs=true ;; E) _run_all_gefs=true ;; S) _run_all_sfs=true ;; - A) _set_account=true && _hpc_account="${OPTARG}" ;; c) _update_cron=true ;; e) _email="${OPTARG}" && _set_email=true ;; t) _tag="_${OPTARG}" ;; From b8cc2ef6234fbbcae6c095bd8de61243acc173da Mon Sep 17 00:00:00 2001 From: David Huber Date: Mon, 2 Dec 2024 13:40:58 +0000 Subject: [PATCH 08/11] Add documentation on EXPDIR archiving --- docs/source/configure.rst | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/docs/source/configure.rst b/docs/source/configure.rst index 439c5df110..bc37bbf833 100644 --- a/docs/source/configure.rst +++ b/docs/source/configure.rst @@ -48,12 +48,15 @@ The global-workflow configs contain switches that change how the system runs. Ma | | (.true.) or cold (.false)? | | | be set when running ``setup_expt.py`` script with | | | | | | the ``--start`` flag (e.g. ``--start warm``) | +------------------+----------------------------------+---------------+-------------+---------------------------------------------------+ -| HPSSARCH | Archive to HPPS | NO | Possibly | Whether to save output to tarballs on HPPS | +| HPSSARCH | Archive to HPPS | NO | NO | Whether to save output to tarballs on HPPS. | +------------------+----------------------------------+---------------+-------------+---------------------------------------------------+ -| LOCALARCH | Archive to a local directory | NO | Possibly | Instead of archiving data to HPSS, archive to a | -| | | | | local directory, specified by ATARDIR. If | -| | | | | LOCALARCH=YES, then HPSSARCH must =NO. Changing | -| | | | | HPSSARCH from YES to NO will adjust the XML. | +| LOCALARCH | Archive to a local directory | NO | NO | Whether to save output to tarballs locally. For | +| | | | | HPSSARCH and LOCALARCH, ARCDIR specifies the | +| | | | | directory. These options are mutually exclusive. | ++------------------+----------------------------------+---------------+-------------+---------------------------------------------------+ +| ARCH_EXPDIR | Archive the EXPDIR | NO | NO | Whether to create a tarball of the EXPDIR. | +| | | | | ARCH_HASHES and ARCH_DIFFS generate text files | +| | | | | of git output that are archived with the EXPDIR. | +------------------+----------------------------------+---------------+-------------+---------------------------------------------------+ | QUILTING | Use I/O quilting | .true. | NO | If .true. choose OUTPUT_GRID as cubed_sphere_grid | | | | | | in netcdf or gaussian_grid | From 7ab8dc1236341270bf9ab7a9cec07a76229f08e8 Mon Sep 17 00:00:00 2001 From: David Huber <69919478+DavidHuber-NOAA@users.noreply.github.com> Date: Tue, 3 Dec 2024 20:07:04 +0000 Subject: [PATCH 09/11] Apply suggestions from code review Co-authored-by: Walter Kolczynski - NOAA --- parm/archive/expdir.yaml.j2 | 2 -- ush/python/pygfs/task/archive.py | 40 ++++++++++++++------------------ 2 files changed, 18 insertions(+), 24 deletions(-) diff --git a/parm/archive/expdir.yaml.j2 b/parm/archive/expdir.yaml.j2 index b9b07d278c..e2ec3f4736 100644 --- a/parm/archive/expdir.yaml.j2 +++ b/parm/archive/expdir.yaml.j2 @@ -11,7 +11,6 @@ expdir: {% for config in glob(EXPDIR ~ "/config.*") %} - [ "{{ config }}", "{{ ROTDIR }}/{{ copy_expdir }}/." ] {% endfor %} - - [ "{{ EXPDIR }}/{{ PSLOT }}.db", "{{ ROTDIR }}/{{ copy_expdir }}/." ] - [ "{{ EXPDIR }}/{{ PSLOT }}.xml", "{{ ROTDIR }}/{{ copy_expdir }}/." ] {% if ARCH_HASHES or ARCH_DIFFS %} - [ "{{ EXPDIR }}/git_info.log", "{{ ROTDIR }}/{{ copy_expdir }}/." ] @@ -19,7 +18,6 @@ expdir: target: "{{ ATARDIR }}/{{ cycle_YMDH }}/expdir.tar" required: - "{{ copy_expdir }}/config.*" - - "{{ copy_expdir }}/{{ PSLOT }}.db" - "{{ copy_expdir }}/{{ PSLOT }}.xml" {% if ARCH_HASHES or ARCH_DIFFS %} - "{{ copy_expdir }}/git_info.log" diff --git a/ush/python/pygfs/task/archive.py b/ush/python/pygfs/task/archive.py index 1c6636922f..9144476d97 100644 --- a/ush/python/pygfs/task/archive.py +++ b/ush/python/pygfs/task/archive.py @@ -115,13 +115,13 @@ def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str # Determine if we are archiving the EXPDIR this cycle if arch_dict.ARCH_EXPDIR: - self.archive_expdir = Archive._archive_expdir(arch_dict) + self.archive_expdir = self._archive_expdir(arch_dict) arch_dict.archive_expdir = self.archive_expdir if self.archive_expdir: # If requested, get workflow hashes/statuses/diffs for EXPDIR archiving if arch_dict.ARCH_HASHES or arch_dict.ARCH_DIFFS: - Archive._pop_git_info(arch_dict) + self._pop_git_info(arch_dict) master_yaml = "master_" + arch_dict.RUN + ".yaml.j2" @@ -439,7 +439,6 @@ def replace_string_from_to_file(filename_in, filename_out, search_str, replace_s return - @staticmethod @logit(logger) def _archive_expdir(arch_dict: Dict[str, Any]) -> bool: """ @@ -482,21 +481,18 @@ def _archive_expdir(arch_dict: Dict[str, Any]) -> bool: # Determine if we should skip this cycle # If the frequency is set to 0, only run on sdate (+assim_freq for cycled) and edate - if freq == 0: - if mode == "forecast-only" and (current_cycle != sdate or current_cycle != edate): - return False - elif mode == "cycled" and (current_cycle != sdate + assim_freq or current_cycle != edate): - return False - # Otherwise, the frequency is in hours - elif mode == "forecast-only" and (sdate - current_cycle).total_seconds() % freq != 0: - return False - elif mode == "cycled" and (sdate + assim_freq - current_cycle).total_seconds() % freq != 0: + first_full = sdate + if mode in ["cycled"]: + first_full += assim_freq + if current_cycle in [first_full, edate]: + # Always save the first and last + return True + elif (current_cycle - first_full).total_seconds() % freq == 0: + # Otherwise, the frequency is in hours + return True + else return False - # Looks like we are archiving the EXPDIR - return True - - @staticmethod @logit(logger) def _pop_git_info(arch_dict: Dict[str, Any]) -> Dict[str, Any]: """ @@ -543,8 +539,8 @@ def _pop_git_info(arch_dict: Dict[str, Any]) -> Dict[str, Any]: output += git("rev-parse", "HEAD", output=str) output += "\nSubmodule hashes:\n" output += git("submodule", "status", output=str) - except ProcessError: - raise OSError("FATAL ERROR Failed to run git") + except ProcessError as pe: + raise OSError("FATAL ERROR Failed to run git") from pe # Are we running git to get diffs? if arch_diffs: @@ -559,16 +555,16 @@ def _pop_git_info(arch_dict: Dict[str, Any]) -> Dict[str, Any]: print("WARNING git was unable to do a recursive diff.\n" "Only a top level diff was performed.\n" "Note that the git version must be >= 2.14 for this feature.") - except ProcessError: - raise OSError("FATAL ERROR Failed to run 'git diff'") + except ProcessError as pe: + raise OSError("FATAL ERROR Failed to run 'git diff'") from pe # Write out to the log file try: with open(os.path.join(expdir, git_filename), 'w') as output_file: output_file.write(output) - except OSError: + except OSError as ose: fname = os.path.join(expdir, git_filename) - raise OSError(f"FATAL ERROR Unable to write git output to '{fname}'") + raise OSError(f"FATAL ERROR Unable to write git output to '{fname}'") from ose return From 0dd3eb89131e4cffb88ea7a08ceeeff1ab62a1ef Mon Sep 17 00:00:00 2001 From: David Huber <69919478+DavidHuber-NOAA@users.noreply.github.com> Date: Tue, 3 Dec 2024 20:11:27 +0000 Subject: [PATCH 10/11] Apply suggestions from code review --- ush/python/pygfs/task/archive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ush/python/pygfs/task/archive.py b/ush/python/pygfs/task/archive.py index 9144476d97..d000db0cd5 100644 --- a/ush/python/pygfs/task/archive.py +++ b/ush/python/pygfs/task/archive.py @@ -490,7 +490,7 @@ def _archive_expdir(arch_dict: Dict[str, Any]) -> bool: elif (current_cycle - first_full).total_seconds() % freq == 0: # Otherwise, the frequency is in hours return True - else + else: return False @logit(logger) From 28bb727d1d6418137f3bff6c85eb279bb7ce049e Mon Sep 17 00:00:00 2001 From: David Huber Date: Wed, 4 Dec 2024 18:19:47 +0000 Subject: [PATCH 11/11] Add self to instance methods --- ush/python/pygfs/task/archive.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ush/python/pygfs/task/archive.py b/ush/python/pygfs/task/archive.py index d000db0cd5..c6376206b3 100644 --- a/ush/python/pygfs/task/archive.py +++ b/ush/python/pygfs/task/archive.py @@ -113,8 +113,8 @@ def configure(self, arch_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str self.tar_cmd = "" return arcdir_set, [] - # Determine if we are archiving the EXPDIR this cycle - if arch_dict.ARCH_EXPDIR: + # Determine if we are archiving the EXPDIR this cycle (always skip for ensembles) + if "enkf" not in arch_dict.RUN and arch_dict.ARCH_EXPDIR: self.archive_expdir = self._archive_expdir(arch_dict) arch_dict.archive_expdir = self.archive_expdir @@ -440,7 +440,7 @@ def replace_string_from_to_file(filename_in, filename_out, search_str, replace_s return @logit(logger) - def _archive_expdir(arch_dict: Dict[str, Any]) -> bool: + def _archive_expdir(self, arch_dict: Dict[str, Any]) -> bool: """ This function checks if the EXPDIR should be archived this RUN/cycle and returns the temporary path in the ROTDIR where the EXPDIR will be @@ -494,7 +494,7 @@ def _archive_expdir(arch_dict: Dict[str, Any]) -> bool: return False @logit(logger) - def _pop_git_info(arch_dict: Dict[str, Any]) -> Dict[str, Any]: + def _pop_git_info(self, arch_dict: Dict[str, Any]) -> Dict[str, Any]: """ This function checks the configuration options ARCH_HASHES and ARCH_DIFFS and ARCH_EXPDIR_FREQ to determine if the git hashes and/or diffs should be