Skip to content

Commit

Permalink
update applications.py
Browse files Browse the repository at this point in the history
  • Loading branch information
kevindougherty-noaa committed Jan 8, 2025
1 parent 0c1ba04 commit bec7451
Showing 1 changed file with 83 additions and 70 deletions.
153 changes: 83 additions & 70 deletions workflow/applications/applications.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#!/usr/bin/env python3

from typing import Dict, List, Any
from datetime import timedelta
from hosts import Host
from wxflow import Configuration
from wxflow import Configuration, to_timedelta
from abc import ABC, ABCMeta, abstractmethod

__all__ = ['AppConfig']
Expand Down Expand Up @@ -30,84 +31,95 @@ def __init__(self, conf: Configuration) -> None:

self.scheduler = Host().scheduler

# Get the most basic settings from config.base to determine
# experiment type ({NET}_{MODE})
base = conf.parse_config('config.base')

self.mode = base['MODE']

if self.mode not in self.VALID_MODES:
raise NotImplementedError(f'{self.mode} is not a valid application mode.\n'
f'Valid application modes are:\n'
f'{", ".join(self.VALID_MODES)}\n')

self.net = base['NET']
print(f"Generating the XML for a {self.mode}_{self.net} case")
self.model_app = base.get('APP', 'ATM')
self.do_atm = base.get('DO_ATM', True)
self.do_wave = base.get('DO_WAVE', False)
self.do_wave_bnd = base.get('DOBNDPNT_WAVE', False)
self.do_ocean = base.get('DO_OCN', False)
self.do_ice = base.get('DO_ICE', False)
self.do_aero = base.get('DO_AERO', False)
self.do_prep_obs_aero = base.get('DO_PREP_OBS_AERO', False)
self.do_bufrsnd = base.get('DO_BUFRSND', False)
self.do_gempak = base.get('DO_GEMPAK', False)
self.do_awips = base.get('DO_AWIPS', False)
self.do_verfozn = base.get('DO_VERFOZN', True)
self.do_verfrad = base.get('DO_VERFRAD', True)
self.do_vminmon = base.get('DO_VMINMON', True)
self.do_anlstat = base.get('DO_ANLSTAT', True)
self.do_tracker = base.get('DO_TRACKER', True)
self.do_genesis = base.get('DO_GENESIS', True)
self.do_genesis_fsu = base.get('DO_GENESIS_FSU', False)
self.do_metp = base.get('DO_METP', False)
self.do_upp = not base.get('WRITE_DOPOST', True)
self.do_goes = base.get('DO_GOES', False)
self.do_mos = base.get('DO_MOS', False)
self.do_extractvars = base.get('DO_EXTRACTVARS', False)

self.do_hpssarch = base.get('HPSSARCH', False)

self.nens = base.get('NMEM_ENS', 0)
self.fcst_segments = base.get('FCST_SEGMENTS', None)
self.interval_gfs = to_timedelta(f"{base.get('INTERVAL_GFS')}H")

if not AppConfig.is_monotonic(self.fcst_segments):
raise ValueError(f'Forecast segments do not increase monotonically: {",".join(self.fcst_segments)}')

self.wave_runs = None
if self.do_wave:
wave_run = base.get('WAVE_RUN', 'BOTH').lower()
if wave_run in ['both']:
self.wave_runs = ['gfs', 'gdas']
elif wave_run in ['gfs', 'gdas']:
self.wave_runs = [wave_run]

self.aero_anl_runs = None
self.aero_fcst_runs = None
if self.do_aero:
aero_anl_run = base.get('AERO_ANL_RUN', 'BOTH').lower()
if aero_anl_run in ['both']:
self.aero_anl_runs = ['gfs', 'gdas']
elif aero_anl_run in ['gfs', 'gdas']:
self.aero_anl_runs = [aero_anl_run]
aero_fcst_run = base.get('AERO_FCST_RUN', None).lower()
if aero_fcst_run in ['both']:
self.aero_fcst_runs = ['gfs', 'gdas']
elif aero_fcst_run in ['gfs', 'gdas']:
self.aero_fcst_runs = [aero_fcst_run]

def _init_finalize(self, conf: Configuration):
'''
Finalize object initialization calling subclass methods
'''
print("Finalizing initialize")

# Get run-, net-, and mode-based options
self.run_options = self._get_run_options(conf)
# Get a list of all possible config_files that would be part of the application
self.configs_names = self._get_app_configs()

# Get task names and runs for the application
self.task_names = self.get_task_names()
# Source the config files for the jobs in the application without specifying a RUN
self.configs = {'_no_run': self._source_configs(conf)}

# Initialize the configs and model_apps dictionaries
self.configs = dict.fromkeys(self.runs)
# Update the base config dictionary based on application
self.configs['_no_run']['base'] = self._update_base(self.configs['_no_run']['base'])

# Now configure the experiment for each valid run
for run in self.runs:
self.configs[run] = self._source_configs(conf, run=run, log=False)
# Save base in the internal state since it is often needed
base = self.configs['_no_run']['base']

def _get_run_options(self, conf: Configuration) -> Dict[str, Any]:
'''
Determine the do_* and APP options for each RUN by sourcing config.base
for each RUN and collecting the flags into self.run_options. Note that
this method is overloaded so additional NET- and MODE-dependent flags
can be set.
'''
# Get task names for the application
self.task_names = self.get_task_names()

run_options = {run: {} for run in dict.fromkeys(self.runs)}
for run in self.runs:
# Read config.base with RUN specified
run_base = conf.parse_config('config.base', RUN=run)

run_options[run]['app'] = run_base.get('APP', 'ATM')
run_options[run]['do_wave_bnd'] = run_base.get('DOBNDPNT_WAVE', False)
run_options[run]['do_bufrsnd'] = run_base.get('DO_BUFRSND', False)
run_options[run]['do_gempak'] = run_base.get('DO_GEMPAK', False)
run_options[run]['do_awips'] = run_base.get('DO_AWIPS', False)
run_options[run]['do_verfozn'] = run_base.get('DO_VERFOZN', True)
run_options[run]['do_verfrad'] = run_base.get('DO_VERFRAD', True)
run_options[run]['do_vminmon'] = run_base.get('DO_VMINMON', True)
run_options[run]['do_anlstat'] = run_base.get('DO_ANLSTAT', True)
run_options[run]['do_tracker'] = run_base.get('DO_TRACKER', True)
run_options[run]['do_genesis'] = run_base.get('DO_GENESIS', True)
run_options[run]['do_genesis_fsu'] = run_base.get('DO_GENESIS_FSU', False)
run_options[run]['do_metp'] = run_base.get('DO_METP', False)
run_options[run]['do_upp'] = not run_base.get('WRITE_DOPOST', True)
run_options[run]['do_goes'] = run_base.get('DO_GOES', False)
run_options[run]['do_mos'] = run_base.get('DO_MOS', False)
run_options[run]['do_extractvars'] = run_base.get('DO_EXTRACTVARS', False)

run_options[run]['do_atm'] = run_base.get('DO_ATM', True)
run_options[run]['do_wave'] = run_base.get('DO_WAVE', False)
run_options[run]['do_ocean'] = run_base.get('DO_OCN', False)
run_options[run]['do_ice'] = run_base.get('DO_ICE', False)
run_options[run]['do_prep_obs_aero'] = run_base.get('DO_PREP_OBS_AERO', False)
run_options[run]['do_aero_anl'] = run_base.get('DO_AERO_ANL', False)
run_options[run]['do_aero_fcst'] = run_base.get('DO_AERO_FCST', False)

run_options[run]['do_hpssarch'] = run_base.get('HPSSARCH', False)
run_options[run]['fcst_segments'] = run_base.get('FCST_SEGMENTS', None)

if not AppConfig.is_monotonic(run_options[run]['fcst_segments']):
raise ValueError(f'Forecast segments do not increase monotonically: {",".join(self.fcst_segments)}')

# Return the dictionary of run options
return run_options
# Finally, source the configuration files for each valid `RUN`
for run in self.task_names.keys():
self.configs[run] = self._source_configs(conf, run=run, log=False)

# Update the base config dictionary based on application and RUN
self.configs[run]['base'] = self._update_base(self.configs[run]['base'])

@abstractmethod
def _get_app_configs(self):
Expand Down Expand Up @@ -139,12 +151,13 @@ def _source_configs(self, conf: Configuration, run: str = "gfs", log: bool = Tru
Every config depends on "config.base"
"""

# Include config.base by its lonesome and update it
configs = {'base': conf.parse_config('config.base', RUN=run)}
configs['base'] = self._update_base(configs['base'])
configs = dict()

# Return config.base as well
configs['base'] = conf.parse_config('config.base', RUN=run)

# Source the list of all config_files involved in the application
for config in self._get_app_configs(run):
for config in self.configs_names:

# All must source config.base first
files = ['config.base']
Expand All @@ -170,17 +183,17 @@ def _source_configs(self, conf: Configuration, run: str = "gfs", log: bool = Tru
return configs

@abstractmethod
def get_task_names(self, run: str) -> Dict[str, List[str]]:
def get_task_names(self, run="_no_run") -> Dict[str, List[str]]:
'''
Create a list of valid RUNs and a dict of task names for each RUN valid for the configuation.
Create a list of task names for each RUN valid for the configuation.
Parameters
----------
None
Returns
-------
Dict[str, List[str]]: Lists of all tasks for each RUN.
Dict[str, List[str]]: Lists of tasks for each RUN.
'''
pass
Expand All @@ -206,4 +219,4 @@ def is_monotonic(test_list: List, check_decreasing: bool = False) -> bool:
if check_decreasing:
return all(x > y for x, y in zip(test_list, test_list[1:]))
else:
return all(x < y for x, y in zip(test_list, test_list[1:]))
return all(x < y for x, y in zip(test_list, test_list[1:]))

0 comments on commit bec7451

Please sign in to comment.