Skip to content

Commit

Permalink
Splits AWIPS jobs into seperate tasks (#2094)
Browse files Browse the repository at this point in the history
This PR addresses issue #1228. The following is accomplished:

- Separate rocoto jobs have been created beneath `jobs/rocoto/`
-- `awips_20sh` and `awips_g2sh`; these jobs replace `awips.sh` which was
calling multiple J-jobs within the respective `awips.sh` scripts;
- New tasks has been added to `workflow/rocoto/tasks.py` for the new
AWIPS scripts;
- The `gfs_cycled` and `gfs_forecast_only` modules beneath
`workflow/rocoto` have been updated accordingly.

  Resolves #1228
  • Loading branch information
HenryRWinterbottom authored Dec 5, 2023
1 parent 04d97e9 commit e62a3a7
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 52 deletions.
20 changes: 7 additions & 13 deletions jobs/rocoto/awips.sh → jobs/rocoto/awips_20km_1p0deg.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ source "${HOMEgfs}/ush/load_fv3gfs_modules.sh"
status=$?
(( status != 0 )) && exit "${status}"

export job="awips"
export job="awips_20km_1p0deg"
export jobid="${job}.$$"

# TODO (#1228) - This script is doing more than just calling a j-job
# Also, this forces us to call the config files here instead of the j-job
source "${HOMEgfs}/ush/jjob_header.sh" -e "awips" -c "base awips"

fhrlst=$(echo ${FHRLST} | sed -e 's/_/ /g; s/f/ /g; s/,/ /g')
# shellcheck disable=SC2153
fhrlst=$(echo "${FHRLST}" | sed -e 's/_/ /g; s/f/ /g; s/,/ /g')

###############################################################

Expand All @@ -45,22 +44,17 @@ for fhr3 in ${fhrlst}; do
fhmax=84
if (( fhr >= fhmin && fhr <= fhmax )); then
if ((fhr % 3 == 0)); then
export fcsthrs=${fhr3}
${AWIPS20SH}
fi

if ((fhr % 6 == 0)); then
${AWIPSG2SH}
export fcsthrs="${fhr3}"
"${AWIPS20KM1P0DEGSH}"
fi
fi

fhmin=90
fhmax=240
if (( fhr >= fhmin && fhr <= fhmax )); then
if ((fhr % 6 == 0)); then
export fcsthrs=${fhr3}
${AWIPS20SH}
${AWIPSG2SH}
export fcsthrs="${fhr3}"
"${AWIPS20KM1P0DEGSH}"
fi
fi
done
Expand Down
57 changes: 57 additions & 0 deletions jobs/rocoto/awips_g2.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#! /usr/bin/env bash

source "${HOMEgfs}/ush/preamble.sh"

###############################################################
## Abstract:
## Inline awips driver script
## HOMEgfs : /full/path/to/workflow
## EXPDIR : /full/path/to/config/files
## CDATE : current analysis date (YYYYMMDDHH)
## CDUMP : cycle name (gdas / gfs)
## PDY : current date (YYYYMMDD)
## cyc : current cycle (HH)
###############################################################

###############################################################
# Source FV3GFS workflow modules
source "${HOMEgfs}/ush/load_fv3gfs_modules.sh"
status=$?
(( status != 0 )) && exit "${status}"

export job="awips_g2"
export jobid="${job}.$$"

source "${HOMEgfs}/ush/jjob_header.sh" -e "awips" -c "base awips"

# shellcheck disable=SC2153
fhrlst=$(echo "${FHRLST}" | sed -e "s/_/ /g; s/f/ /g; s/,/ /g")

###############################################################

################################################################################
echo
echo "=============== BEGIN AWIPS ==============="

for fhr3 in ${fhrlst}; do
fhr=$(( 10#${fhr3} ))
if (( fhr > FHMAX_GFS )); then
echo "Nothing to process for FHR = ${fhr3}, cycle"
continue
fi

fhmin=0
fhmax=240
if (( fhr >= fhmin && fhr <= fhmax )); then
if ((fhr % 6 == 0)); then
"${AWIPSG2SH}"
fi
fi
done


###############################################################
# Force Exit out cleanly
if [[ ${KEEPDATA:-"NO"} == "NO" ]] ; then rm -rf "${DATA}" ; fi

exit 0
6 changes: 3 additions & 3 deletions parm/config/gfs/config.awips
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
echo "BEGIN: config.awips"

# Get task specific resources
. $EXPDIR/config.resources awips
. "${EXPDIR}/config.resources" awips

export AWIPS20SH=$HOMEgfs/jobs/JGFS_ATMOS_AWIPS_20KM_1P0DEG
export AWIPSG2SH=$HOMEgfs/jobs/JGFS_ATMOS_AWIPS_G2
export AWIPS20KM1P0DEGSH="${HOMEgfs}/jobs/JGFS_ATMOS_AWIPS_20KM_1P0DEG"
export AWIPSG2SH="${HOMEgfs}/jobs/JGFS_ATMOS_AWIPS_G2"

# No. of concurrent awips jobs
export NAWIPSGRP=42
Expand Down
3 changes: 2 additions & 1 deletion workflow/applications/gfs_cycled.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ def get_task_names(self):
gfs_tasks += ['gempak']

if self.do_awips:
gfs_tasks += ['awips']
gfs_tasks += ['awips_20km_1p0deg']
gfs_tasks += ['awips_g2']
gfs_tasks += ['fbwinds']

if self.do_npoess:
Expand Down
3 changes: 2 additions & 1 deletion workflow/applications/gfs_forecast_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ def get_task_names(self):
tasks += ['gempak']

if self.do_awips:
tasks += ['awips']
tasks += ['awips_20km_1p0deg']
tasks += ['awips_g2']
tasks += ['fbwinds']

if self.do_wafs:
Expand Down
91 changes: 58 additions & 33 deletions workflow/rocoto/gfs_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,43 +822,68 @@ def fbwinds(self):

return task

def awips(self):
@staticmethod
def _get_awipsgroups(cdump, config):

def _get_awipsgroups(cdump, config):
fhmin = config['FHMIN']
fhmax = config['FHMAX']
fhout = config['FHOUT']

fhmin = config['FHMIN']
fhmax = config['FHMAX']
fhout = config['FHOUT']
# Get a list of all forecast hours
fhrs = []
if cdump in ['gdas']:
fhrs = range(fhmin, fhmax + fhout, fhout)
elif cdump in ['gfs']:
fhmax = np.max(
[config['FHMAX_GFS_00'], config['FHMAX_GFS_06'], config['FHMAX_GFS_12'], config['FHMAX_GFS_18']])
fhout = config['FHOUT_GFS']
fhmax_hf = config['FHMAX_HF_GFS']
fhout_hf = config['FHOUT_HF_GFS']
if fhmax > 240:
fhmax = 240
if fhmax_hf > 240:
fhmax_hf = 240
fhrs_hf = list(range(fhmin, fhmax_hf + fhout_hf, fhout_hf))
fhrs = fhrs_hf + list(range(fhrs_hf[-1] + fhout, fhmax + fhout, fhout))

nawipsgrp = config['NAWIPSGRP']
ngrps = nawipsgrp if len(fhrs) > nawipsgrp else len(fhrs)

fhrs = [f'f{fhr:03d}' for fhr in fhrs]
fhrs = np.array_split(fhrs, ngrps)
fhrs = [fhr.tolist() for fhr in fhrs]

grp = ' '.join([f'_{fhr[0]}-{fhr[-1]}' for fhr in fhrs])
dep = ' '.join([fhr[-1] for fhr in fhrs])
lst = ' '.join(['_'.join(fhr) for fhr in fhrs])

return grp, dep, lst

def awips_20km_1p0deg(self):

# Get a list of all forecast hours
fhrs = []
if cdump in ['gdas']:
fhrs = range(fhmin, fhmax + fhout, fhout)
elif cdump in ['gfs']:
fhmax = np.max(
[config['FHMAX_GFS_00'], config['FHMAX_GFS_06'], config['FHMAX_GFS_12'], config['FHMAX_GFS_18']])
fhout = config['FHOUT_GFS']
fhmax_hf = config['FHMAX_HF_GFS']
fhout_hf = config['FHOUT_HF_GFS']
if fhmax > 240:
fhmax = 240
if fhmax_hf > 240:
fhmax_hf = 240
fhrs_hf = list(range(fhmin, fhmax_hf + fhout_hf, fhout_hf))
fhrs = fhrs_hf + list(range(fhrs_hf[-1] + fhout, fhmax + fhout, fhout))
deps = []
dep_dict = {'type': 'metatask', 'name': f'{self.cdump}post'}
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)

nawipsgrp = config['NAWIPSGRP']
ngrps = nawipsgrp if len(fhrs) > nawipsgrp else len(fhrs)
awipsenvars = self.envars.copy()
awipsenvar_dict = {'FHRGRP': '#grp#',
'FHRLST': '#lst#',
'ROTDIR': self._base.get('ROTDIR')}
for key, value in awipsenvar_dict.items():
awipsenvars.append(rocoto.create_envar(name=key, value=str(value)))

fhrs = [f'f{fhr:03d}' for fhr in fhrs]
fhrs = np.array_split(fhrs, ngrps)
fhrs = [fhr.tolist() for fhr in fhrs]
varname1, varname2, varname3 = 'grp', 'dep', 'lst'
varval1, varval2, varval3 = self._get_awipsgroups(self.cdump, self._configs['awips'])
vardict = {varname2: varval2, varname3: varval3}

grp = ' '.join([f'_{fhr[0]}-{fhr[-1]}' for fhr in fhrs])
dep = ' '.join([fhr[-1] for fhr in fhrs])
lst = ' '.join(['_'.join(fhr) for fhr in fhrs])
resources = self.get_resource('awips')
task = create_wf_task('awips_20km_1p0deg', resources, cdump=self.cdump, envar=awipsenvars, dependency=dependencies,
metatask='awips_20km_1p0deg', varname=varname1, varval=varval1, vardict=vardict)

return grp, dep, lst
return task

def awips_g2(self):

deps = []
dep_dict = {'type': 'metatask', 'name': f'{self.cdump}post'}
Expand All @@ -873,12 +898,12 @@ def _get_awipsgroups(cdump, config):
awipsenvars.append(rocoto.create_envar(name=key, value=str(value)))

varname1, varname2, varname3 = 'grp', 'dep', 'lst'
varval1, varval2, varval3 = _get_awipsgroups(self.cdump, self._configs['awips'])
varval1, varval2, varval3 = self._get_awipsgroups(self.cdump, self._configs['awips'])
vardict = {varname2: varval2, varname3: varval3}

resources = self.get_resource('awips')
task = create_wf_task('awips', resources, cdump=self.cdump, envar=awipsenvars, dependency=dependencies,
metatask='awips', varname=varname1, varval=varval1, vardict=vardict)
task = create_wf_task('awips_g2', resources, cdump=self.cdump, envar=awipsenvars, dependency=dependencies,
metatask='awips_g2', varname=varname1, varval=varval1, vardict=vardict)

return task

Expand Down
2 changes: 1 addition & 1 deletion workflow/rocoto/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Tasks:
'fcst', 'post', 'ocnpost',
'verfozn', 'verfrad', 'vminmon', 'metp',
'tracker', 'genesis', 'genesis_fsu',
'postsnd', 'awips', 'fbwinds', 'gempak',
'postsnd', 'awips_g2', 'awips_20km_1p0deg', 'fbwinds', 'gempak',
'waveawipsbulls', 'waveawipsgridded', 'wavegempak', 'waveinit',
'wavepostbndpnt', 'wavepostbndpntbll', 'wavepostpnt', 'wavepostsbs', 'waveprep',
'npoess']
Expand Down

0 comments on commit e62a3a7

Please sign in to comment.