diff --git a/workflow/applications/gfs_cycled.py b/workflow/applications/gfs_cycled.py index 1325aff664..f317dbd320 100644 --- a/workflow/applications/gfs_cycled.py +++ b/workflow/applications/gfs_cycled.py @@ -1,7 +1,6 @@ -from typing import Dict, Any from applications.applications import AppConfig -from wxflow import Configuration, to_timedelta -from datetime import timedelta +from typing import Dict, Any +from wxflow import Configuration class GFSCycledAppConfig(AppConfig): @@ -11,116 +10,140 @@ class GFSCycledAppConfig(AppConfig): def __init__(self, conf: Configuration): super().__init__(conf) + # Re-read config.base without RUN specified to get the basic settings for + # cycled cases to be able to determine valid runs base = conf.parse_config('config.base') - self.do_hybvar = base.get('DOHYBVAR', False) - self.do_fit2obs = base.get('DO_FIT2OBS', True) - self.do_jediatmvar = base.get('DO_JEDIATMVAR', False) - self.do_jediatmens = base.get('DO_JEDIATMENS', False) - self.do_jediocnvar = base.get('DO_JEDIOCNVAR', False) - self.do_jedisnowda = base.get('DO_JEDISNOWDA', False) - self.do_mergensst = base.get('DO_MERGENSST', False) - self.do_vrfy_oceanda = base.get('DO_VRFY_OCEANDA', False) - - self.lobsdiag_forenkf = False - self.eupd_runs = None - if self.do_hybvar: - self.lobsdiag_forenkf = base.get('lobsdiag_forenkf', False) - eupd_run = base.get('EUPD_CYC', 'gdas').lower() - if eupd_run in ['both']: - self.eupd_runs = ['gfs', 'gdas'] - elif eupd_run in ['gfs', 'gdas']: - self.eupd_runs = [eupd_run] - - def _get_app_configs(self): + + self.ens_runs = [] + + if base.get('DOHYBVAR', False): + ens_run = base.get('EUPD_CYC', 'gdas').lower() + if ens_run in ['both']: + self.ens_runs = ['gfs', 'gdas'] + elif ens_run in ['gfs', 'gdas']: + self.ens_runs = [ens_run] + + # Now construct self.runs the desired XML order (gdas, enkfgdas, gfs, enkfgfs) + self.runs = ["gdas"] # We always have a 'gdas' run + self.runs.append('enkfgdas') if 'gdas' in self.ens_runs else 0 + self.runs.append("gfs") if base['INTERVAL_GFS'] > 0 else 0 + self.runs.append('enkfgfs') if 'gfs' in self.ens_runs and "gfs" in self.runs else 0 + + def _get_run_options(self, conf: Configuration) -> Dict[str, Any]: + + run_options = super()._get_run_options(conf) + + for run in self.runs: + base = conf.parse_config('config.base', RUN=run) + + run_options[run]['do_hybvar'] = base.get('DOHYBVAR', False) + run_options[run]['do_hybvar_ocn'] = base.get('DOHYBVAR_OCN', False) + run_options[run]['nens'] = base.get('NMEM_ENS', 0) + if run_options[run]['do_hybvar']: + run_options[run]['lobsdiag_forenkf'] = base.get('lobsdiag_forenkf', False) + + run_options[run]['do_fit2obs'] = base.get('DO_FIT2OBS', True) + run_options[run]['do_jediatmvar'] = base.get('DO_JEDIATMVAR', False) + run_options[run]['do_jediatmens'] = base.get('DO_JEDIATMENS', False) + run_options[run]['do_jediocnvar'] = base.get('DO_JEDIOCNVAR', False) + run_options[run]['do_jedisnowda'] = base.get('DO_JEDISNOWDA', False) + run_options[run]['do_mergensst'] = base.get('DO_MERGENSST', False) + run_options[run]['do_vrfy_oceanda'] = base.get('DO_VRFY_OCEANDA', False) + + return run_options + + def _get_app_configs(self, run): """ - Returns the config_files that are involved in the cycled app + Returns the config files that are involved in the cycled app """ + options = self.run_options[run] configs = ['prep'] - if self.do_jediatmvar: + if options['do_jediatmvar']: configs += ['prepatmiodaobs', 'atmanlinit', 'atmanlvar', 'atmanlfv3inc', 'atmanlfinal'] else: configs += ['anal', 'analdiag'] - if self.do_jediocnvar: + if options['do_jediocnvar']: configs += ['prepoceanobs', 'marineanlinit', 'marinebmat', 'marineanlvar'] - if self.do_hybvar: + if options['do_hybvar']: configs += ['marineanlletkf', 'ocnanalecen'] configs += ['marineanlchkpt', 'marineanlfinal'] - if self.do_vrfy_oceanda: + if options['do_vrfy_oceanda']: configs += ['ocnanalvrfy'] - if self.do_ocean or self.do_ice: + if options['do_ocean'] or options['do_ice']: configs += ['oceanice_products'] configs += ['stage_ic', 'sfcanl', 'analcalc', 'fcst', 'upp', 'atmos_products', 'arch', 'cleanup'] - if self.do_hybvar: - if self.do_jediatmens: - configs += ['atmensanlinit', 'atmensanlobs', 'atmensanlsol', 'atmensanlletkf', 'atmensanlfv3inc', 'atmensanlfinal'] + if options['do_hybvar']: + if options['do_jediatmens']: + configs += ['atmensanlinit', 'atmensanlobs', 'atmensanlsol', + 'atmensanlletkf', 'atmensanlfv3inc', 'atmensanlfinal'] else: configs += ['eobs', 'eomg', 'ediag', 'eupd'] configs += ['ecen', 'esfc', 'efcs', 'echgres', 'epos', 'earc'] - if self.do_fit2obs: + if options['do_fit2obs']: configs += ['fit2obs'] - if self.do_verfozn: + if options['do_verfozn']: configs += ['verfozn'] - if self.do_verfrad: + if options['do_verfrad']: configs += ['verfrad'] - if self.do_vminmon: + if options['do_vminmon']: configs += ['vminmon'] - - if self.do_anlstat: + + if options['do_anlstat']: configs += ['anlstat'] - if self.do_tracker: + if options['do_tracker']: configs += ['tracker'] - if self.do_genesis: + if options['do_genesis']: configs += ['genesis'] - if self.do_genesis_fsu: + if options['do_genesis_fsu']: configs += ['genesis_fsu'] - if self.do_metp: + if options['do_metp']: configs += ['metp'] - if self.do_gempak: + if options['do_gempak']: configs += ['gempak'] - if self.do_goes: + if options['do_goes']: configs += ['npoess'] - if self.do_bufrsnd: + if options['do_bufrsnd']: configs += ['postsnd'] - if self.do_awips: + if options['do_awips']: configs += ['awips'] - if self.do_wave: + if options['do_wave']: configs += ['waveinit', 'waveprep', 'wavepostsbs', 'wavepostpnt'] - if self.do_wave_bnd: + if options['do_wave_bnd']: configs += ['wavepostbndpnt', 'wavepostbndpntbll'] - if self.do_gempak: + if options['do_gempak']: configs += ['wavegempak'] - if self.do_awips: + if options['do_awips']: configs += ['waveawipsbulls', 'waveawipsgridded'] - if self.do_aero: + if options['do_aero_anl']: configs += ['aeroanlgenb', 'aeroanlinit', 'aeroanlvar', 'aeroanlfinal'] - if self.do_prep_obs_aero: + if options['do_prep_obs_aero']: configs += ['prepobsaero'] - if self.do_jedisnowda: + if options['do_jedisnowda']: configs += ['snowanl'] - if self.do_hybvar: - configs += ['esnowrecen'] + if options['do_hybvar']: + configs += ['esnowanl'] - if self.do_mos: + if options['do_mos']: configs += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep', 'mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst', 'mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen', @@ -135,181 +158,174 @@ def _update_base(base_in): def get_task_names(self): """ - Get the task names for all the tasks in the cycled application. - Note that the order of the task names matters in the XML. - This is the place where that order is set. + Get the task names for each valid run in this cycled configuration. + NOTE: The order of the task names matters in the XML. + This is the place where that order is set. """ - gdas_gfs_common_tasks_before_fcst = ['prep'] - gdas_gfs_common_cleanup_tasks = ['arch', 'cleanup'] - - if self.do_jediatmvar: - gdas_gfs_common_tasks_before_fcst += ['prepatmiodaobs', 'atmanlinit', 'atmanlvar', 'atmanlfv3inc', 'atmanlfinal'] - else: - gdas_gfs_common_tasks_before_fcst += ['anal'] - - if self.do_jediocnvar: - gdas_gfs_common_tasks_before_fcst += ['prepoceanobs', 'marineanlinit', 'marinebmat', 'marineanlvar'] - if self.do_hybvar: - gdas_gfs_common_tasks_before_fcst += ['marineanlletkf', 'ocnanalecen'] - gdas_gfs_common_tasks_before_fcst += ['marineanlchkpt', 'marineanlfinal'] - if self.do_vrfy_oceanda: - gdas_gfs_common_tasks_before_fcst += ['ocnanalvrfy'] - - gdas_gfs_common_tasks_before_fcst += ['sfcanl', 'analcalc'] - - if self.do_jedisnowda: - gdas_gfs_common_tasks_before_fcst += ['snowanl'] - - wave_prep_tasks = ['waveinit', 'waveprep'] - wave_bndpnt_tasks = ['wavepostbndpnt', 'wavepostbndpntbll'] - wave_post_tasks = ['wavepostsbs', 'wavepostpnt'] - - hybrid_tasks = [] - hybrid_after_eupd_tasks = [] - if self.do_hybvar: - if self.do_jediatmens: - hybrid_tasks += ['atmensanlinit', 'atmensanlfv3inc', 'atmensanlfinal', 'echgres'] - hybrid_tasks += ['atmensanlobs', 'atmensanlsol'] if self.lobsdiag_forenkf else ['atmensanlletkf'] - else: - hybrid_tasks += ['eobs', 'eupd', 'echgres'] - hybrid_tasks += ['ediag'] if self.lobsdiag_forenkf else ['eomg'] - if self.do_jedisnowda: - hybrid_tasks += ['esnowrecen'] - hybrid_after_eupd_tasks += ['stage_ic', 'ecen', 'esfc', 'efcs', 'epos', 'earc', 'cleanup'] - - # Collect all "gdas" cycle tasks - gdas_tasks = gdas_gfs_common_tasks_before_fcst.copy() - - if not self.do_jediatmvar: - gdas_tasks += ['analdiag'] - - if self.do_wave and 'gdas' in self.wave_runs: - gdas_tasks += wave_prep_tasks - - if self.do_aero and 'gdas' in self.aero_anl_runs: - gdas_tasks += ['aeroanlgenb', 'aeroanlinit', 'aeroanlvar', 'aeroanlfinal'] - if self.do_prep_obs_aero: - gdas_tasks += ['prepobsaero'] - - gdas_tasks += ['stage_ic', 'atmanlupp', 'atmanlprod', 'fcst'] - - if self.do_upp: - gdas_tasks += ['atmupp'] - gdas_tasks += ['atmos_prod'] - - if self.do_wave and 'gdas' in self.wave_runs: - if self.do_wave_bnd: - gdas_tasks += wave_bndpnt_tasks - gdas_tasks += wave_post_tasks - - if self.do_fit2obs: - gdas_tasks += ['fit2obs'] - - if self.do_verfozn: - gdas_tasks += ['verfozn'] - - if self.do_verfrad: - gdas_tasks += ['verfrad'] - - if self.do_vminmon: - gdas_tasks += ['vminmon'] - - if self.do_gempak: - gdas_tasks += ['gempak', 'gempakmetancdc'] - - gdas_tasks += gdas_gfs_common_cleanup_tasks - - # Collect "gfs" cycle tasks - gfs_tasks = gdas_gfs_common_tasks_before_fcst.copy() - - if self.do_wave and 'gfs' in self.wave_runs: - gfs_tasks += wave_prep_tasks - - if self.do_aero and 'gfs' in self.aero_anl_runs: - gfs_tasks += ['aeroanlinit', 'aeroanlvar', 'aeroanlfinal'] - if self.do_prep_obs_aero: - gfs_tasks += ['prepobsaero'] - - gfs_tasks += ['atmanlupp', 'atmanlprod', 'fcst'] - - if self.do_ocean: - gfs_tasks += ['ocean_prod'] - - if self.do_ice: - gfs_tasks += ['ice_prod'] - - if self.do_upp: - gfs_tasks += ['atmupp'] - gfs_tasks += ['atmos_prod'] - - if self.do_goes: - gfs_tasks += ['goesupp'] - - if self.do_vminmon: - gfs_tasks += ['vminmon'] - - if self.do_anlstat: - gfs_tasks += ['anlstat'] - - if self.do_tracker: - gfs_tasks += ['tracker'] - - if self.do_genesis: - gfs_tasks += ['genesis'] - - if self.do_genesis_fsu: - gfs_tasks += ['genesis_fsu'] - - if self.do_metp: - gfs_tasks += ['metp'] - - if self.do_wave and 'gfs' in self.wave_runs: - if self.do_wave_bnd: - gfs_tasks += wave_bndpnt_tasks - gfs_tasks += wave_post_tasks - if self.do_gempak: - gfs_tasks += ['wavegempak'] - if self.do_awips: - gfs_tasks += ['waveawipsbulls', 'waveawipsgridded'] - - if self.do_bufrsnd: - gfs_tasks += ['postsnd'] - - if self.do_gempak: - gfs_tasks += ['gempak'] - gfs_tasks += ['gempakmeta'] - gfs_tasks += ['gempakncdcupapgif'] - if self.do_goes: - gfs_tasks += ['npoess_pgrb2_0p5deg'] - gfs_tasks += ['gempakpgrb2spec'] - - if self.do_awips: - gfs_tasks += ['awips_20km_1p0deg', 'fbwind'] - - if self.do_mos: - gfs_tasks += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep', - 'mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst', - 'mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', 'mos_ext_grd_prdgen', - 'mos_wx_prdgen', 'mos_wx_ext_prdgen'] - - gfs_tasks += gdas_gfs_common_cleanup_tasks - - tasks = dict() - tasks['gdas'] = gdas_tasks - - if self.do_hybvar and 'gdas' in self.eupd_runs: - enkfgdas_tasks = hybrid_tasks + hybrid_after_eupd_tasks - tasks['enkfgdas'] = enkfgdas_tasks - - # Add RUN=gfs tasks if running early cycle - if self.interval_gfs > to_timedelta("0H"): - tasks['gfs'] = gfs_tasks + # Start with a dictionary of empty task lists for each valid run + task_names = {run: [] for run in self.runs} - if self.do_hybvar and 'gfs' in self.eupd_runs: - enkfgfs_tasks = hybrid_tasks + hybrid_after_eupd_tasks - enkfgfs_tasks.remove("echgres") - enkfgfs_tasks.remove("esnowrecen") - tasks['enkfgfs'] = enkfgfs_tasks + for run in self.runs: + options = self.run_options[run] - return tasks \ No newline at end of file + # Common gdas and gfs tasks before fcst + if run in ['gdas', 'gfs']: + task_names[run] += ['prep'] + if options['do_jediatmvar']: + task_names[run] += ['prepatmiodaobs', 'atmanlinit', 'atmanlvar', 'atmanlfv3inc', 'atmanlfinal'] + else: + task_names[run] += ['anal'] + + if options['do_jediocnvar']: + task_names[run] += ['prepoceanobs', 'marineanlinit', 'marinebmat', 'marineanlvar'] + if options['do_hybvar']: + task_names[run] += ['marineanlletkf', 'ocnanalecen'] + task_names[run] += ['marineanlchkpt', 'marineanlfinal'] + if options['do_vrfy_oceanda']: + task_names[run] += ['ocnanalvrfy'] + + task_names[run] += ['sfcanl', 'analcalc'] + + if options['do_jedisnowda']: + task_names[run] += ['snowanl'] + + wave_prep_tasks = ['waveinit', 'waveprep'] + wave_bndpnt_tasks = ['wavepostbndpnt', 'wavepostbndpntbll'] + wave_post_tasks = ['wavepostsbs', 'wavepostpnt'] + + # gdas- and gfs-specific analysis tasks + if run == 'gdas': + if not options['do_jediatmvar']: + task_names[run] += ['analdiag'] + + if options['do_wave']: + task_names[run] += wave_prep_tasks + + if options['do_aero_anl']: + task_names[run] += ['aeroanlgenb'] + + else: + if options['do_wave']: + task_names[run] += wave_prep_tasks + + if options['do_aero_anl']: + task_names[run] += ['aeroanlinit', 'aeroanlvar', 'aeroanlfinal'] + + if options['do_prep_obs_aero']: + task_names[run] += ['prepobsaero'] + + # Staging is gdas-specific + if run == 'gdas': + task_names[run] += ['stage_ic'] + + task_names[run] += ['atmanlupp', 'atmanlprod', 'fcst'] + + # gfs-specific products + if run == 'gfs': + if options['do_ocean']: + task_names[run] += ['ocean_prod'] + + if options['do_ice']: + task_names[run] += ['ice_prod'] + + if options['do_upp']: + task_names[run] += ['atmupp'] + task_names[run] += ['atmos_prod'] + + # GOES post-processing (gfs only) + if run == 'gfs': + if options['do_goes']: + task_names[run] += ['goesupp'] + + # Only fit to obs and verify ozone and radiance during gdas cycles + if run == "gdas": + if options['do_fit2obs']: + task_names[run] += ['fit2obs'] + if options['do_verfozn']: + task_names[run] += ['verfozn'] + if options['do_verfrad']: + task_names[run] += ['verfrad'] + + if options['do_vminmon']: + task_names[run] += ['vminmon'] + + if options['do_anlstat']: + task_names[run] += ['anlstat'] + + # gfs-only verification/tracking + if run == 'gfs': + if options['do_tracker']: + task_names[run] += ['tracker'] + + if options['do_genesis']: + task_names[run] += ['genesis'] + + if options['do_genesis_fsu']: + task_names[run] += ['genesis_fsu'] + + if options['do_metp']: + task_names[run] += ['metp'] + + if options['do_wave']: + if options['do_wave_bnd']: + task_names[run] += wave_bndpnt_tasks + task_names[run] += wave_post_tasks + # wave gempak and awips jobs are gfs-specific + if run == 'gfs': + if options['do_gempak']: + task_names[run] += ['wavegempak'] + if options['do_awips']: + task_names[run] += ['waveawipsbulls', 'waveawipsgridded'] + + # gdas- and gfs-specific downstream products + if run == 'gdas': + if options['do_gempak']: + task_names[run] += ['gempak', 'gempakmetancdc'] + else: + if options['do_bufrsnd']: + task_names[run] += ['postsnd'] + + if options['do_gempak']: + task_names[run] += ['gempak'] + task_names[run] += ['gempakmeta'] + task_names[run] += ['gempakncdcupapgif'] + if options['do_goes']: + task_names[run] += ['npoess_pgrb2_0p5deg'] + task_names[run] += ['gempakpgrb2spec'] + + if options['do_awips']: + task_names[run] += ['awips_20km_1p0deg', 'fbwind'] + + if options['do_mos']: + task_names[run] += ['mos_stn_prep', 'mos_grd_prep', 'mos_ext_stn_prep', 'mos_ext_grd_prep', + 'mos_stn_fcst', 'mos_grd_fcst', 'mos_ext_stn_fcst', 'mos_ext_grd_fcst', + 'mos_stn_prdgen', 'mos_grd_prdgen', 'mos_ext_stn_prdgen', + 'mos_ext_grd_prdgen', 'mos_wx_prdgen', 'mos_wx_ext_prdgen'] + + # Last two items + task_names[run] += ['arch', 'cleanup'] + + # Ensemble tasks + elif 'enkf' in run: + + if options['do_jediatmens']: + task_names[run] += ['atmensanlinit', 'atmensanlfv3inc', 'atmensanlfinal'] + # Only run echgres for the gdas cycle + task_names[run] += ['echgres'] if 'gdas' in run else 0 + if options['lobsdiag_forenkf']: + task_names[run] += ['atmensanlobs', 'atmensanlsol'] + else: + task_names[run] += ['atmensanlletkf'] + + else: + task_names[run] += ['eobs', 'eupd'] + task_names[run].append('echgres') if 'gdas' in run else 0 + task_names[run] += ['ediag'] if options['lobsdiag_forenkf'] else ['eomg'] + task_names[run].append('esnowanl') if options['do_jedisnowda'] and 'gdas' in run else 0 + + task_names[run].append('efcs') if 'gdas' in run else 0 + task_names[run].append('epos') if 'gdas' in run else 0 + task_names[run] += ['stage_ic', 'ecen', 'esfc', 'earc', 'cleanup'] + + return task_names \ No newline at end of file