Skip to content

Commit

Permalink
per #1610, break up large function into smaller components to prevent…
Browse files Browse the repository at this point in the history
… SQ code smell (cognitive complexity)
  • Loading branch information
georgemccabe committed Mar 22, 2023
1 parent 30a2fba commit 38fb680
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ def test_find_var_indices_fcst(metplus_config,
)
@pytest.mark.util
def test_get_field_search_prefixes(data_type, met_tool, expected_out):
assert(config_metplus.get_field_search_prefixes(data_type,
met_tool) == expected_out)
assert(config_metplus._get_field_search_prefixes(data_type,
met_tool) == expected_out)


# search prefixes are valid prefixes to append to field info variables
Expand Down
249 changes: 162 additions & 87 deletions metplus/util/config_metplus.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,113 +921,48 @@ def parse_var_list(config, time_info=None, data_type=None, met_tool=None,
# validate configs again in case wrapper is not running from run_metplus
# this does not need to be done if parsing a specific data type,
# i.e. ENS or FCST
if data_type is None:
if not validate_field_info_configs(config)[0]:
return []
elif data_type == 'BOTH':
if data_type == 'BOTH':
config.logger.error("Cannot request BOTH explicitly in parse_var_list")
return []

# var_list is a list containing an list of dictionaries
var_list = []
if data_type is None and not validate_field_info_configs(config)[0]:
return []

# if specific data type is requested, only get that type
if data_type:
data_types = [data_type]
# otherwise get both FCST and OBS
else:
data_types = ['FCST', 'OBS']
data_types = [data_type] if data_type else ['FCST', 'OBS']

# get indices of VAR<n> items for data type and/or met tool
indices = []
if met_tool:
indices = _find_var_name_indices(config, data_types, met_tool)
indices = _get_var_name_indices(config, data_types, met_tool)
if not indices:
indices = _find_var_name_indices(config, data_types)
return []

# list of dictionaries that contain variable/field information
var_list = []

# get config name prefixes for each data type to find
dt_search_prefixes = {}
for current_type in data_types:
# get list of variable prefixes to search
prefixes = get_field_search_prefixes(current_type, met_tool)
dt_search_prefixes[current_type] = prefixes
dt_search_prefixes = _get_all_field_search_prefixes(data_types, met_tool)

# loop over all possible variables and add them to list
for index in indices:
field_info_list = []
for current_type in data_types:
# get dictionary of existing config variables to use
search_prefixes = dt_search_prefixes[current_type]
field_configs = get_field_config_variables(config,
index,
search_prefixes)

field_info = _format_var_items(field_configs, time_info)
if not isinstance(field_info, dict):
config.logger.error(f'Could not process {current_type}_'
f'VAR{index} variables: {field_info}')
continue

field_info['data_type'] = current_type.lower()
field_info_list.append(field_info)
field_list = _get_field_list(index, data_types, dt_search_prefixes,
config, time_info)

# check that all fields types were found
if not field_info_list or len(data_types) != len(field_info_list):
if not field_list or len(data_types) != len(field_list):
continue

# check if number of levels for each field type matches
n_levels = len(field_info_list[0]['levels'])
if len(data_types) > 1:
if n_levels != len(field_info_list[1]['levels']):
continue
n_levels = len(field_list[0]['levels'])
if (len(data_types) > 1 and
n_levels != len(field_list[1]['levels'])):
continue

# if requested, put all field levels in a single item
if levels_as_list:
var_dict = {}
for field_info in field_info_list:
current_type = field_info.get('data_type')
var_dict[f"{current_type}_name"] = field_info.get('name')
var_dict[f"{current_type}_level"] = field_info.get('levels')
var_dict[f"{current_type}_thresh"] = field_info.get('thresh')
var_dict[f"{current_type}_extra"] = field_info.get('extra')
var_dict[f"{current_type}_output_name"] = field_info.get('output_names')

var_dict['index'] = index
var_list.append(var_dict)
var_list.append(_get_field_all_levels(index, field_list))
continue

# loop over levels and add all values to output dictionary
for level_index in range(n_levels):
var_dict = {}

# get level values to use for string substitution in name
# used for python embedding calls that read the level value
sub_info = {}
for field_info in field_info_list:
dt_level = f"{field_info.get('data_type')}_level"
sub_info[dt_level] = field_info.get('levels')[level_index]

for field_info in field_info_list:
current_type = field_info.get('data_type')
name = field_info.get('name')
level = field_info.get('levels')[level_index]
thresh = field_info.get('thresh')
extra = field_info.get('extra')
output_name = field_info.get('output_names')[level_index]

# substitute level in name if filename template is specified
subbed_name = do_string_sub(name,
skip_missing_tags=True,
**sub_info)

var_dict[f"{current_type}_name"] = subbed_name
var_dict[f"{current_type}_level"] = level
var_dict[f"{current_type}_thresh"] = thresh
var_dict[f"{current_type}_extra"] = extra
var_dict[f"{current_type}_output_name"] = output_name

var_dict['index'] = index
var_list.append(var_dict)
var_list.extend(_get_field_each_level(index, n_levels, field_list))

# extra debugging information used for developer debugging only
'''
Expand Down Expand Up @@ -1064,6 +999,31 @@ def parse_var_list(config, time_info=None, data_type=None, met_tool=None,
return sorted(var_list, key=lambda x: x['index'])


def _get_var_name_indices(config, data_types, met_tool):
"""!Get list of indices of field variables from config.
Look for wrapper-specific first. If no indices are found, look for generic
variables.
@param config METplusConfig object to parse
@param data_types list of prefixes of config variables that describe the
type of data e.g. FCST or OBS.
@param met_tool name of wrapper to search for wrapper-specific
variables, e.g. *_GRID_STAT_VAR<n>_*.
@returns list of integers for all matching config variables
"""
indices = []

# look for wrapper-specific variables first
if met_tool:
indices = _find_var_name_indices(config, data_types, met_tool)

# if no wrapper-specific variables are found, look for generic variables
if not indices:
indices = _find_var_name_indices(config, data_types)

return indices


def _find_var_name_indices(config, data_types, met_tool=None):
"""!Get list of indices used in _VAR<n>_ config variables. Data type
determines prefix of variable name to find. If FCST or OBS is included
Expand Down Expand Up @@ -1101,15 +1061,112 @@ def _find_var_name_indices(config, data_types, met_tool=None):
return [int(index) for index in indices]


def _get_field_list(index, data_types, dt_search_prefixes, config, time_info):
"""!Get list of field information.
@param index integer index for fields to search
@param data_types list of prefixes of config variables that describe the
type of data e.g. FCST or OBS
@param dt_search_prefixes dictionary of search strings to find variables
@param config METplusConfig object to query
@param time_info dictionary containing time info for current run
@returns list of dictionaries that contain field information
"""
field_list = []
for current_type in data_types:
# get dictionary of existing config variables to use
search_prefixes = dt_search_prefixes[current_type]
field_configs = get_field_config_variables(config, index,
search_prefixes)
field_info = _format_var_items(field_configs, time_info)
if not isinstance(field_info, dict):
config.logger.error(f'Could not process {current_type}_'
f'VAR{index} variables: {field_info}')
continue

field_info['data_type'] = current_type.lower()
field_list.append(field_info)

return field_list


def _get_field_all_levels(index, field_list):
"""!Create dictionary of field information with all levels contained as a
list in a single item.
@param index integer index for fields to search
@param field_list list of dictionaries that contain field information
@returns dictionary containing field information with data type in keys
"""
var_dict = {}
for field in field_list:
current_type = field.get('data_type')
var_dict[f"{current_type}_name"] = field.get('name')
var_dict[f"{current_type}_level"] = field.get('levels')
var_dict[f"{current_type}_thresh"] = field.get('thresh')
var_dict[f"{current_type}_extra"] = field.get('extra')
var_dict[f"{current_type}_output_name"] = field.get('output_names')

var_dict['index'] = index
return var_dict


def _get_field_each_level(index, n_levels, field_list):
"""!Create list of dictionaries of field information with a separate entry
for each level value, creating a list of name/level pairs.
@param index integer index for fields to search
@param n_levels integer number of levels to read
@param field_list list of dictionaries that contain field information
@returns list of dictionaries containing field information with
data type in keys
"""
new_var_list = []
# loop over levels and add all values to output dictionary
for level_index in range(n_levels):
var_dict = {}

# get level values to use for string substitution in name
# used for python embedding calls that read the level value
sub_info = {}
for field_info in field_list:
dt_level = f"{field_info.get('data_type')}_level"
sub_info[dt_level] = field_info.get('levels')[level_index]

for field_info in field_list:
current_type = field_info.get('data_type')
name = field_info.get('name')
level = field_info.get('levels')[level_index]
thresh = field_info.get('thresh')
extra = field_info.get('extra')
output_name = field_info.get('output_names')[level_index]

# substitute level in name if filename template is specified
subbed_name = do_string_sub(name,
skip_missing_tags=True,
**sub_info)

var_dict[f"{current_type}_name"] = subbed_name
var_dict[f"{current_type}_level"] = level
var_dict[f"{current_type}_thresh"] = thresh
var_dict[f"{current_type}_extra"] = extra
var_dict[f"{current_type}_output_name"] = output_name

var_dict['index'] = index
new_var_list.append(var_dict)

return new_var_list


def _format_var_items(field_configs, time_info=None):
"""! Substitute time information into field information and format values.
@param field_configs dictionary with config variable names to read
@param time_info dictionary containing time info for current run
@returns dictionary containing name, levels, and output_names, as
well as thresholds and extra options if found. If not enough
information was set in the METplusConfig object, an empty
dictionary is returned.
information was set in the METplusConfig object, a string describing
the error is returned
"""
# dictionary to hold field (var) item info
var_items = {}
Expand Down Expand Up @@ -1188,7 +1245,25 @@ def _format_var_items(field_configs, time_info=None):
return var_items


def get_field_search_prefixes(data_type, met_tool=None):
def _get_all_field_search_prefixes(data_types, met_tool):
"""!Populate dictionary with prefixes to search for config variables.
@param data_types list of field types to search, i.e. FCST, OBS, ENS, etc.
@param met_tool name of tool to search for variable or None if looking
for generic field info
@returns dictionary where keys are a data type and value is a list of
search prefixes
"""
dt_search_prefixes = {}
for current_type in data_types:
# get list of variable prefixes to search
prefixes = _get_field_search_prefixes(current_type, met_tool)
dt_search_prefixes[current_type] = prefixes

return dt_search_prefixes


def _get_field_search_prefixes(data_type, met_tool=None):
"""! Get list of prefixes to search for field variables.
@param data_type type of field to search for, i.e. FCST, OBS, ENS, etc.
Expand Down

0 comments on commit 38fb680

Please sign in to comment.