Skip to content

Commit

Permalink
per #1687, added logic to run once for each valid time if looping by …
Browse files Browse the repository at this point in the history
…init and vice-versa. Allows wrappers that typically run once for each valid time like PB2NC to automatically skip times that have already been processed without having to configure the wrapper to skip if the output already exists
  • Loading branch information
georgemccabe committed Sep 28, 2022
1 parent 1514f4e commit adc3c7d
Showing 1 changed file with 84 additions and 10 deletions.
94 changes: 84 additions & 10 deletions metplus/wrappers/runtime_freq_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
@endcode
'''


class RuntimeFreqWrapper(CommandBuilder):

# valid options for run frequency
FREQ_OPTIONS = [
'RUN_ONCE',
'RUN_ONCE_PER_INIT_OR_VALID',
'RUN_ONCE_PER_INIT',
'RUN_ONCE_PER_VALID',
'RUN_ONCE_PER_LEAD',
'RUN_ONCE_FOR_EACH'
]
Expand Down Expand Up @@ -134,6 +137,16 @@ def run_all_times_custom(self, custom):
self.run_once(custom)
elif runtime_freq == 'RUN_ONCE_PER_INIT_OR_VALID':
self.run_once_per_init_or_valid(custom)
elif runtime_freq == 'RUN_ONCE_PER_VALID':
if not is_loop_by_init(self.config):
self.run_once_per_init_or_valid(custom)
else:
self.run_once_per_runtime(custom, 'valid')
elif runtime_freq == 'RUN_ONCE_PER_INIT':
if is_loop_by_init(self.config):
self.run_once_per_init_or_valid(custom)
else:
self.run_once_per_runtime(custom, 'init')
elif runtime_freq == 'RUN_ONCE_PER_LEAD':
self.run_once_per_lead(custom)
elif runtime_freq == 'RUN_ONCE_FOR_EACH':
Expand Down Expand Up @@ -189,6 +202,64 @@ def run_once_per_init_or_valid(self, custom):

return success

def run_once_per_runtime(self, custom, init_or_valid):
"""! Run once for each init or valid specified as an argument.
For example, if looping by INIT and running once per valid time,
compute valid time from init and forecast lead, then only run if
the valid time has not been run yet.
@param custom string from custom loop list
@param init_or_valid defines which time to run.
Acceptable values are 'init' or 'valid'.
@returns True on success, False if any run failed
"""
self.logger.debug(f"Running once for each {init_or_valid} time")

success = True
already_run = []
for time_input in time_generator(self.config):
if time_input is None:
success = False
continue

log_runtime_banner(self.config, time_input, self)
add_to_time_input(time_input,
instance=self.instance,
custom=custom)

lead_seq = get_lead_sequence(self.config, input_dict=None)
for lead in lead_seq:
time_input['lead'] = lead
time_info = time_util.ti_calculate(time_input)

self.logger.info(
f"Processing forecast lead {time_info['lead_string']}"
)

if skip_time(time_info, self.c_dict.get('SKIP_TIMES', {})):
self.logger.debug('Skipping run time')
continue

if time_info[init_or_valid] in already_run:
self.logger.debug(f"Already processed {init_or_valid} "
"time. Skipping.")
continue

# get files for current run time
self.c_dict['ALL_FILES'] = (
self.get_all_files_for_init_valid_lead(time_info)
)

# Run for given init/valid time and forecast lead combination
self.clear()
if not self.run_at_time_once(time_info):
success = False

# save run time to prevent it from being processed again
already_run.append(time_info[init_or_valid])

return success

def run_once_per_lead(self, custom):
self.logger.debug("Running once for forecast lead time")
success = True
Expand Down Expand Up @@ -248,17 +319,10 @@ def run_once_for_each(self, custom):
self.logger.debug('Skipping run time')
continue

# since run_all_times was not called (LOOP_BY=times) then
# get files for current run time
file_dict = self.get_files_from_time(time_info)
all_files = []
if file_dict:
if isinstance(file_dict, list):
all_files = file_dict
else:
all_files = [file_dict]

self.c_dict['ALL_FILES'] = all_files
self.c_dict['ALL_FILES'] = (
self.get_all_files_for_init_valid_lead(time_info)
)

# Run for given init/valid time and forecast lead combination
self.clear()
Expand All @@ -267,6 +331,16 @@ def run_once_for_each(self, custom):

return success

def get_all_files_for_init_valid_lead(self, time_info):
file_dict = self.get_files_from_time(time_info)
if not file_dict:
return []

if isinstance(file_dict, list):
return file_dict

return [file_dict]

def get_all_files(self, custom=None):
"""! Get all files that can be processed with the app.
@returns A dictionary where the key is the type of data that was found,
Expand Down

0 comments on commit adc3c7d

Please sign in to comment.