diff --git a/workflow/pygw b/workflow/pygw
new file mode 120000
index 0000000000..dfa1d9a164
--- /dev/null
+++ b/workflow/pygw
@@ -0,0 +1 @@
+../ush/python/pygw/src/pygw
\ No newline at end of file
diff --git a/workflow/rocoto/workflow_tasks.py b/workflow/rocoto/workflow_tasks.py
index 5ec1dbb39c..edb35af513 100644
--- a/workflow/rocoto/workflow_tasks.py
+++ b/workflow/rocoto/workflow_tasks.py
@@ -310,10 +310,6 @@ def aerosol_init(self):
interval = self._base['INTERVAL']
offset = f'-{interval}'
- # Previous cycle
- dep_dict = {'type': 'cycleexist', 'offset': offset}
- deps.append(rocoto.add_dependency(dep_dict))
-
# Files from previous cycle
files = [f'@Y@m@d.@H0000.fv_core.res.nc'] + \
[f'@Y@m@d.@H0000.fv_core.res.tile{tile}.nc' for tile in range(1, self.n_tiles + 1)] + \
@@ -326,8 +322,10 @@ def aerosol_init(self):
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
+ cycledef = 'gfs_seq'
resources = self.get_resource('aerosol_init')
- task = create_wf_task('aerosol_init', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)
+ task = create_wf_task('aerosol_init', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies,
+ cycledef=cycledef)
return task
@@ -387,8 +385,6 @@ def analdiag(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}anal'}
deps.append(rocoto.add_dependency(dep_dict))
- dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'}
- deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
resources = self.get_resource('analdiag')
@@ -446,8 +442,6 @@ def atmanalpost(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}atmanalrun'}
deps.append(rocoto.add_dependency(dep_dict))
- dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'}
- deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
resources = self.get_resource('atmanalpost')
@@ -459,7 +453,6 @@ def aeroanlinit(self):
suffix = self._base["SUFFIX"]
dump_suffix = self._base["DUMP_SUFFIX"]
- gfs_cyc = self._base["gfs_cyc"]
dmpdir = self._base["DMPDIR"]
deps = []
@@ -494,8 +487,6 @@ def aeroanlfinal(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}aeroanlrun'}
deps.append(rocoto.add_dependency(dep_dict))
- dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'}
- deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
resources = self.get_resource('aeroanlfinal')
@@ -508,8 +499,6 @@ def gldas(self):
deps = []
dep_dict = {'type': 'task', 'name': f'{self.cdump}sfcanl'}
deps.append(rocoto.add_dependency(dep_dict))
- dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'}
- deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
resources = self.get_resource('gldas')
@@ -604,8 +593,11 @@ def _fcst_cycled(self):
dependencies.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='or', dep=dependencies)
+ cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump
+
resources = self.get_resource('fcst')
- task = create_wf_task('fcst', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)
+ task = create_wf_task('fcst', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies,
+ cycledef=cycledef)
return task
@@ -679,9 +671,11 @@ def _get_postgroups(cdump, config, add_anl=False):
varval1, varval2, varval3 = _get_postgroups(self.cdump, self._configs[task_name], add_anl=add_anl_to_post)
vardict = {varname2: varval2, varname3: varval3}
+ cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump
+
resources = self.get_resource(task_name)
task = create_wf_task(task_name, resources, cdump=self.cdump, envar=postenvars, dependency=dependencies,
- metatask=task_name, varname=varname1, varval=varval1, vardict=vardict)
+ metatask=task_name, varname=varname1, varval=varval1, vardict=vardict, cycledef=cycledef)
return task
@@ -913,8 +907,11 @@ def vrfy(self):
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep=deps)
+ cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump
+
resources = self.get_resource('vrfy')
- task = create_wf_task('vrfy', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)
+ task = create_wf_task('vrfy', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies,
+ cycledef=cycledef)
return task
@@ -961,8 +958,11 @@ def arch(self):
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
+ cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump
+
resources = self.get_resource('arch')
- task = create_wf_task('arch', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)
+ task = create_wf_task('arch', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies,
+ cycledef=cycledef)
return task
@@ -1171,9 +1171,10 @@ def efcs(self):
groups = self._get_hybgroups(self._base['NMEM_ENKF'], self._configs['efcs']['NMEM_EFCSGRP'])
+ cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump
resources = self.get_resource('efcs')
task = create_wf_task('efcs', resources, cdump=self.cdump, envar=efcsenvars, dependency=dependencies,
- metatask='efmn', varname='grp', varval=groups)
+ metatask='efmn', varname='grp', varval=groups, cycledef=cycledef)
return task
@@ -1188,8 +1189,11 @@ def echgres(self):
deps.append(rocoto.add_dependency(dep_dict))
dependencies = rocoto.create_dependency(dep_condition='and', dep=deps)
+ cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump
+
resources = self.get_resource('echgres')
- task = create_wf_task('echgres', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies)
+ task = create_wf_task('echgres', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies,
+ cycledef=cycledef)
return task
@@ -1231,9 +1235,11 @@ def _get_eposgroups(epos):
varval1, varval2, varval3 = _get_eposgroups(self._configs['epos'])
vardict = {varname2: varval2, varname3: varval3}
+ cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump
+
resources = self.get_resource('epos')
task = create_wf_task('epos', resources, cdump=self.cdump, envar=eposenvars, dependency=dependencies,
- metatask='epmn', varname=varname1, varval=varval1, vardict=vardict)
+ metatask='epmn', varname=varname1, varval=varval1, vardict=vardict, cycledef=cycledef)
return task
@@ -1251,9 +1257,11 @@ def earc(self):
groups = self._get_hybgroups(self._base['NMEM_ENKF'], self._configs['earc']['NMEM_EARCGRP'], start_index=0)
+ cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump
+
resources = self.get_resource('earc')
task = create_wf_task('earc', resources, cdump=self.cdump, envar=earcenvars, dependency=dependencies,
- metatask='eamn', varname='grp', varval=groups)
+ metatask='eamn', varname='grp', varval=groups, cycledef=cycledef)
return task
diff --git a/workflow/rocoto/workflow_xml.py b/workflow/rocoto/workflow_xml.py
index 440ff93db5..52ff86db2c 100644
--- a/workflow/rocoto/workflow_xml.py
+++ b/workflow/rocoto/workflow_xml.py
@@ -3,6 +3,7 @@
import os
from distutils.spawn import find_executable
from datetime import datetime
+from pygw.timetools import to_timedelta
from collections import OrderedDict
from applications import AppConfig
from rocoto.workflow_tasks import get_wf_tasks
@@ -110,29 +111,41 @@ def _get_cycledefs(self):
return cycledefs
def _get_cycledefs_cycled(self):
- sdate = self._base['SDATE'].strftime('%Y%m%d%H%M')
- edate = self._base['EDATE'].strftime('%Y%m%d%H%M')
+ sdate = self._base['SDATE']
+ edate = self._base['EDATE']
interval = self._base.get('INTERVAL', '06:00:00')
- strings = [f'\t{sdate} {edate} {interval}\n']
+ strings = []
+ strings.append(f'\t{sdate.strftime("%Y%m%d%H%M")} {sdate.strftime("%Y%m%d%H%M")} {interval}')
+ sdate = sdate + to_timedelta(interval)
+ strings.append(f'\t{sdate.strftime("%Y%m%d%H%M")} {edate.strftime("%Y%m%d%H%M")} {interval}')
if self._app_config.gfs_cyc != 0:
- sdate_gfs = self._base['SDATE_GFS'].strftime('%Y%m%d%H%M')
- edate_gfs = self._base['EDATE_GFS'].strftime('%Y%m%d%H%M')
+ sdate_gfs = self._base['SDATE_GFS']
+ edate_gfs = self._base['EDATE_GFS']
interval_gfs = self._base['INTERVAL_GFS']
- strings.append(f'\t{sdate_gfs} {edate_gfs} {interval_gfs}')
- strings.append('')
- strings.append('')
+ strings.append(f'\t{sdate_gfs.strftime("%Y%m%d%H%M")} {edate_gfs.strftime("%Y%m%d%H%M")} {interval_gfs}')
+
+ sdate_gfs = sdate_gfs + to_timedelta(interval_gfs)
+ if sdate_gfs <= edate_gfs:
+ strings.append(f'\t{sdate_gfs.strftime("%Y%m%d%H%M")} {edate_gfs.strftime("%Y%m%d%H%M")} {interval_gfs}')
+
+ strings.append('')
+ strings.append('')
return '\n'.join(strings)
def _get_cycledefs_forecast_only(self):
- sdate = self._base['SDATE'].strftime('%Y%m%d%H%M')
- edate = self._base['EDATE'].strftime('%Y%m%d%H%M')
+ sdate = self._base['SDATE']
+ edate = self._base['EDATE']
interval = self._base.get('INTERVAL_GFS', '24:00:00')
- cdump = self._base['CDUMP']
- strings = f'\t{sdate} {edate} {interval}\n\n'
+ strings = []
+ strings.append(f'\t{sdate.strftime("%Y%m%d%H%M")} {edate.strftime("%Y%m%d%H%M")} {interval}')
+
+ sdate = sdate + to_timedelta(interval)
+ if sdate <= edate:
+ strings.append(f'\t{sdate.strftime("%Y%m%d%H%M")} {edate.strftime("%Y%m%d%H%M")} {interval}')
- return strings
+ return '\n'.join(strings)
@staticmethod
def _get_workflow_footer():