Skip to content

Commit

Permalink
need to check coords also for pressure derivation. fixed this in base…
Browse files Browse the repository at this point in the history
… deriver. typo in solar module.
  • Loading branch information
bnb32 committed Sep 24, 2024
1 parent 5ad3a3f commit 97463e7
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 48 deletions.
60 changes: 20 additions & 40 deletions sup3r/postprocessing/collectors/h5.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,9 +573,11 @@ def _collect_flist(
logger.warning(msg)
warn(msg)

def group_time_chunks(self, file_paths, n_writes=None):
"""Group files by temporal_chunk_index. Assumes file_paths have a
suffix format like ``_{temporal_chunk_index}_{spatial_chunk_index}.h5``
def get_flist_chunks(self, file_paths, n_writes=None):
"""Group files by temporal_chunk_index and then combines these groups
if ``n_writes`` is less than the number of time_chunks. Assumes
file_paths have a suffix format like
``_{temporal_chunk_index}_{spatial_chunk_index}.h5``
Parameters
----------
Expand All @@ -587,56 +589,34 @@ def group_time_chunks(self, file_paths, n_writes=None):
Returns
-------
file_chunks : list
List of lists of file paths grouped by ``temporal_chunk_index``
flist_chunks : list
List of file list chunks. Used to split collection and writing into
multiple steps.
"""
file_split = {}
file_chunks = {}
for file in file_paths:
t_chunk, _ = self.get_chunk_indices(file)
file_split[t_chunk] = [*file_split.get(t_chunk, []), file]
file_chunks = list(file_split.values())

logger.debug(
f'Split file list into {len(file_chunks)} chunks '
'according to temporal chunk indices'
)
file_chunks[t_chunk] = [*file_chunks.get(t_chunk, []), file]

if n_writes is not None:
msg = (
f'n_writes ({n_writes}) must be less than or equal '
f'to the number of temporal chunks ({len(file_chunks)}).'
)
assert n_writes <= len(file_chunks), msg
return file_chunks

def get_flist_chunks(self, file_paths, n_writes=None):
"""Get file list chunks based on n_writes. This first groups files
based on time index and then splits those groups into ``n_writes``
n_writes = n_writes or len(file_chunks)
tc_groups = np.array_split(list(file_chunks.keys()), n_writes)
fp_groups = [[file_chunks[tc] for tc in tcs] for tcs in tc_groups]
flist_chunks = [np.concatenate(group) for group in fp_groups]
logger.debug(
'Split file list into %s chunks according to n_writes=%s',
len(flist_chunks),
n_writes,
)

Parameters
----------
file_paths : list
List of file paths to collect
n_writes : int | None
Number of writes to use for collection
logger.debug(f'Grouped file list into {len(file_chunks)} time chunks.')

Returns
-------
flist_chunks : list
List of file list chunks. Used to split collection and writing into
multiple steps.
"""
flist_chunks = self.group_time_chunks(file_paths, n_writes=n_writes)
if n_writes is not None:
flist_chunks = np.array_split(flist_chunks, n_writes)
flist_chunks = [
np.concatenate(fp_chunk) for fp_chunk in flist_chunks
]
logger.debug(
'Split file list into %s chunks according to n_writes=%s',
len(flist_chunks),
n_writes,
)
return flist_chunks

def collect_feature(
Expand Down
2 changes: 1 addition & 1 deletion sup3r/preprocessing/derivers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def derive(self, feature) -> Union[np.ndarray, da.core.Array]:
if compute_check is not None:
return compute_check

if fstruct.basename in self.data.features:
if fstruct.basename in self.data:
logger.debug(
'Attempting level interpolation for "%s"', feature
)
Expand Down
6 changes: 3 additions & 3 deletions sup3r/preprocessing/derivers/methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ def compute(cls, data):
return cloud_mask.astype(np.float32)


class PressureNC(DerivedFeature):
"""Pressure feature class for NETCDF data. Needed since P is perturbation
class PressureWRF(DerivedFeature):
"""Pressure feature class for WRF data. Needed since P is perturbation
pressure.
"""

Expand Down Expand Up @@ -402,6 +402,7 @@ def compute(cls, data):
'cloud_mask': CloudMask,
'clearsky_ratio': ClearSkyRatio,
'sza': Sza,
'pressure_(.*)': 'level_(.*)',
}

RegistryH5WindCC = {
Expand Down Expand Up @@ -429,7 +430,6 @@ def compute(cls, data):
'relativehumidity_min_2m': 'hursmin',
'relativehumidity_max_2m': 'hursmax',
'clearsky_ratio': ClearSkyRatioCC,
'pressure_(.*)': 'level_(.*)',
'temperature_(.*)': TempNCforCC,
'temperature_2m': Tas,
'temperature_max_2m': TasMax,
Expand Down
2 changes: 1 addition & 1 deletion sup3r/solar/solar.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ def _run_temporal_chunk(
fp_out = fp_target.replace('.h5', f'_{fp_out_suffix}.h5')

if os.path.exists(fp_out):
logger.info('%s already exists. Skipping.')
logger.info('%s already exists. Skipping.', fp_out)

else:
logger.info(
Expand Down
2 changes: 1 addition & 1 deletion sup3r/solar/solar_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def from_config(ctx, config_file, verbose=False, pipeline_step=None):
node_config['job_name'] = name
node_config['pipeline_step'] = pipeline_step

node_config['temporal_ids'] = temporal_ids
node_config['temporal_ids'] = list(temporal_ids)
cmd = Solar.get_node_cmd(node_config)

if hardware_option.lower() in AVAILABLE_HARDWARE_OPTIONS:
Expand Down
4 changes: 2 additions & 2 deletions sup3r/utilities/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ def xr_open_mfdataset(files, **kwargs):
try:
return xr.open_mfdataset(files, **default_kwargs)
except Exception as e:
msg = 'Could not use xr.open_mfdataset to open %s. '
msg = 'Could not use xr.open_mfdataset to open %s. %s'
if len(files) == 1:
raise RuntimeError(msg % files) from e
raise RuntimeError(msg % (files, e)) from e
msg += 'Trying to open them separately and merge. %s'
logger.warning(msg, files, e)
warn(msg % (files, e))
Expand Down

0 comments on commit 97463e7

Please sign in to comment.