Skip to content

Commit

Permalink
Add Time Series Capability (JCSDA-internal#187)
Browse files Browse the repository at this point in the history
## Description

Add capability to read and transform on the fly and assemble dataset
that has a time series dimension.

- Add test for compiuting mean of omb and making time series.
- Add test for plotting bias correction coefficients time series.
- Add var bc data.
- Add reader for JEDI bias and tlapse files.
- Removed prints of datasets from read and transform classes and added
to driver level.

Plots the new tests create shown below. Note that the tests are somewhat
contrived since the data is the same at both time steps. We can try to
improve on that in the future.


![time_series_omb](https://github.com/JCSDA-internal/eva/assets/27729500/e5d04412-6bde-4f7f-9e4c-da42ca6214d6)


![varbc_time_series](https://github.com/JCSDA-internal/eva/assets/27729500/2ee6db9a-c8ae-476c-b63c-ff9e1782584b)

## Dependencies

NONE

## Impact

NONE

---------

Co-authored-by: danholdaway <danholdaway@users.noreply.github.com>
Co-authored-by: Cory Martin <cory.r.martin@noaa.gov>
Co-authored-by: Akira Sewnath <asewnath@users.noreply.github.com>
  • Loading branch information
4 people authored Jun 5, 2024
1 parent 1df619e commit d60260f
Show file tree
Hide file tree
Showing 30 changed files with 775 additions and 101 deletions.
2 changes: 2 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
*.res filter=lfs diff=lfs merge=lfs -text
*.odb filter=lfs diff=lfs merge=lfs -text
*.ipynb filter=lfs diff=lfs merge=lfs -text
*.satbias filter=lfs diff=lfs merge=lfs -text
*.tlapse filter=lfs diff=lfs merge=lfs -text
3 changes: 0 additions & 3 deletions src/eva/data/csv_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ def execute(self, dataset_config, data_collections, timing):
data_collections.create_or_add_to_collection(collection_name, ds)
ds.close()

# Display the contents of the collections for helping the user with making plots
data_collections.display_collections()

# ----------------------------------------------------------------------------------------------

def generate_default_config(self, filenames, collection_name):
Expand Down
4 changes: 0 additions & 4 deletions src/eva/data/cubed_sphere_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,6 @@ def execute(self, dataset_config, data_collections, timing):
# -------------------------
data_collections.nan_float_values_outside_threshold(threshold)

# Display the contents of the collections for helping the user with making plots
# -------------------------
data_collections.display_collections()

# ----------------------------------------------------------------------------------------------

def generate_default_config(self, filenames, collection_name):
Expand Down
49 changes: 45 additions & 4 deletions src/eva/data/data_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class DataCollections:

"""Manage collections of xarray Datasets with variable manipulations."""

def __init__(self):
def __init__(self, time_series=False):

"""Initialize the DataCollections instance."""

Expand All @@ -42,6 +42,9 @@ def __init__(self):
# Create a logger
self.logger = Logger('DataCollections')

# If this is a time series, store it
self.time_series = False if not time_series else True

# ----------------------------------------------------------------------------------------------

def create_or_add_to_collection(self, collection_name, collection, concat_dimension=None):
Expand All @@ -61,6 +64,11 @@ def create_or_add_to_collection(self, collection_name, collection, concat_dimens
ValueError: If concatenation dimension is missing or invalid.
"""

# If time_series collection name must also be time_series
if self.time_series and collection_name != 'time_series':
self.logger.abort('In create_or_add_to_collection: time_series collection must ' +
'be \'time_series\'')

# Collections should only be xarray datasets
if not isinstance(collection, Dataset):
self.logger.abort('In add_collection: collection must be an xarray.Dataset')
Expand Down Expand Up @@ -149,6 +157,11 @@ def add_variable_to_collection(self, collection_name, group_name, variable_name,
ValueError: If variable is not an xarray DataArray.
"""

# If time_series collection name must also be time_series
if self.time_series and collection_name != 'time_series':
self.logger.abort('In add_variable_to_collection: time_series collection must ' +
'be \'time_series\'')

# Assert that new variable is an xarray Dataarray
if not isinstance(variable, DataArray):
self.logger.abort('In add_variable_to_collection: variable must be xarray.DataArray')
Expand Down Expand Up @@ -197,6 +210,11 @@ def get_variable_data_array(self, collection_name, group_name, variable_name,
is missing.
"""

# If time_series collection name must also be time_series
if self.time_series and collection_name != 'time_series':
self.logger.abort('In get_variable_data_array: time_series collection must ' +
'be \'time_series\'')

group_variable_name = group_name + '::' + variable_name
data_array = self._collections[collection_name][group_variable_name]

Expand Down Expand Up @@ -274,6 +292,11 @@ def get_variable_data(self, collection_name, group_name, variable_name,
ndarray: The selected variable data as a NumPy array.
"""

# If time_series collection name must also be time_series
if self.time_series and collection_name != 'time_series':
self.logger.abort('In get_variable_data: time_series collection must ' +
'be \'time_series\'')

variable_array = self.get_variable_data_array(collection_name, group_name, variable_name,
channels, levels, datatypes)

Expand Down Expand Up @@ -378,6 +401,7 @@ def display_collections(self):
'float32': '{:+.4e}',
'int64': '{:+11d}',
'int32': '{:+11d}',
'datetime64[ns]': '{}'
}

# Display a list of variables that are available in the collection
Expand All @@ -388,7 +412,7 @@ def display_collections(self):
self.logger.info('Collection name: ' + fcol.underline + collection + fcol.end)
self.logger.info('\n Dimensions:')
for dim in list(self._collections[collection].dims):
dim_value = self._collections[collection].dims[dim]
dim_value = self._collections[collection].sizes[dim]
self.logger.info(f' {dim}: {dim_value}')
self.logger.info('\n Coordinates:')
for coord in list(self._collections[collection].coords):
Expand All @@ -411,8 +435,25 @@ def display_collections(self):
rms = np.sqrt(np.nanmean(data_var_value**2))
rms_string = ', RMS=' + minmaxrms_format.format(rms)
minmaxrms_string = ' | ' + min_string + ', ' + max_string + rms_string
self.logger.info(' ' + data_var.ljust(max_name_len) + ' (' +
str(data_var_value.dtype).ljust(7) + ')' + minmaxrms_string)
full_str = ' ' + data_var.ljust(max_name_len) + ' (' + \
str(data_var_value.dtype)[0:7].ljust(7) + ')' + minmaxrms_string
else:
# No min/max
min_string = ''
max_string = ''
minmaxrms_string = ' | ' + min_string + ', ' + max_string
full_str = ' ' + data_var.ljust(max_name_len) + ' (' + \
str(data_var_value.dtype)[0:7].ljust(7) + ')' + minmaxrms_string
self.logger.info(full_str)

# Add the raw xarray display of the collection for more information about coords/dims
self.logger.info(' ')
self.logger.info('/'*80)
self.logger.info(' ')
self.logger.info(f'Raw xarray display of the {fcol.underline + collection + fcol.end} ' +
'collection:')
self.logger.info(' ')
self.logger.info(str(self._collections[collection]))
self.logger.info('-'*80)

# ----------------------------------------------------------------------------------------------
47 changes: 16 additions & 31 deletions src/eva/data/data_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,55 +11,40 @@
# --------------------------------------------------------------------------------------------------


from eva.utilities.config import get
from eva.data.eva_dataset_base import EvaDatasetFactory

import importlib
import os


# --------------------------------------------------------------------------------------------------

def data_driver(config, data_collections, timing, logger):
def data_driver(dataset_config, data_collections, timing, logger):

"""
Driver for executing data processing.
Args:
config (dict): Configuration settings for data processing.
dataset_config (dict): Configuration settings for data processing.
data_collections (DataCollections): Instance of the DataCollections class.
timing (Timing): Timing instance for performance measurement.
logger (Logger): Logger instance for logging messages.
"""

# Get list of dataset dictionaries
datasets = get(config, logger, 'datasets')

# Loop over datasets
for dataset in datasets:
# Check if the dataset_config contains the 'type' key
logger.assert_abort('type' in dataset_config, 'Each dataset must have a \'type\' key')

# Extract name for this diagnostic data type
try:
eva_data_class_name = dataset['type']
except Exception as e:
msg = '\'type\' key not found. \'diagnostic_data_config\': ' \
f'{diagnostic_data_config}, error: {e}'
raise KeyError(msg)
# Extract name for this diagnostic data type
eva_data_class_name = dataset_config['type']

# Create the data object
creator = EvaDatasetFactory()
timing.start('DataObjectConstructor')
eva_data_object = creator.create_eva_object(eva_data_class_name,
'data',
logger,
timing)
timing.stop('DataObjectConstructor')
# Create the data object
creator = EvaDatasetFactory()
timing.start('DataObjectConstructor')
eva_data_object = creator.create_eva_object(eva_data_class_name, 'data', logger, timing)
timing.stop('DataObjectConstructor')

# Prepare diagnostic data
logger.info(f'Running execute for {eva_data_object.name}')
timing.start('DataObjectExecute')
eva_data_object.execute(dataset, data_collections, timing)
timing.stop('DataObjectExecute')
# Prepare diagnostic data
logger.info(f'Running execute for {eva_data_object.name}')
timing.start('DataObjectExecute')
eva_data_object.execute(dataset_config, data_collections, timing)
timing.stop('DataObjectExecute')

# --------------------------------------------------------------------------------------------------
3 changes: 0 additions & 3 deletions src/eva/data/geoval_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ def execute(self, dataset_config, data_collections, timing):
# Nan out unphysical values
data_collections.nan_float_values_outside_threshold(threshold)

# Display the contents of the collections for helping the user with making plots
data_collections.display_collections()

def generate_default_config(self, filenames, collection_name):

"""
Expand Down
3 changes: 0 additions & 3 deletions src/eva/data/gsi_obs_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,6 @@ def execute(self, dataset_config, data_collections, timeing):
# Change the channel dimension name
data_collections.adjust_channel_dimension_name('nchans')

# Display the contents of the collections for helping the user with making plots
data_collections.display_collections()

# ----------------------------------------------------------------------------------------------

def generate_default_config(self, filenames, collection_name):
Expand Down
3 changes: 0 additions & 3 deletions src/eva/data/ioda_obs_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,6 @@ def execute(self, dataset_config, data_collections, timing):
# Nan out unphysical values
data_collections.nan_float_values_outside_threshold(threshold)

# Display the contents of the collections for helping the user with making plots
data_collections.display_collections()

def generate_default_config(self, filenames, collection_name):

"""
Expand Down
3 changes: 0 additions & 3 deletions src/eva/data/jedi_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,6 @@ def execute(self, dataset_config, data_collections, timing):
# Add to the Eva dataset
data_collections.create_or_add_to_collection(collection_name, convergence_ds)

# Write out all the collections
data_collections.display_collections()

# ----------------------------------------------------------------------------------------------

def get_from_log(self, search_term, separator, position, custom_log=None):
Expand Down
Loading

0 comments on commit d60260f

Please sign in to comment.