Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use openghg combine datasets #160

Merged
merged 3 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

# Version 0.2.0

- Replaced `utils.combine_datasets` with (nearly) equivalent function from `openghg.analyse._scenario`. There is currently a thin wrapper to make sure that the second
dataset is loaded into memory, since this change is only on the devel branch of OpenGHG [#PR 160](https://github.com/openghg/openghg_inversions/pull/160)

- Moved filters from `utils.py` to new submodule `filters.py` [#PR 159](https://github.com/openghg/openghg_inversions/pull/159)

- Removed `site_info.json` and `species_info.json` and replaced with calls to functions in `openghg.util`, which pull the same info from `openghg_defs`. [#PR 152](https://github.com/openghg/openghg_inversions/pull/152)
Expand Down
114 changes: 30 additions & 84 deletions openghg_inversions/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import pandas as pd
import xarray as xr
from openghg.analyse import ModelScenario
from openghg.analyse import combine_datasets as openghg_combine_datasets
from openghg.util import get_species_info, synonyms
from tqdm import tqdm

Expand All @@ -30,6 +31,34 @@
openghginv_path = Paths.openghginv


def combine_datasets(
dataset_A: xr.Dataset,
dataset_B: xr.Dataset,
method: Optional[str] = "nearest",
tolerance: Optional[float] = None,
) -> xr.Dataset:
"""
Merges two datasets and re-indexes to the first dataset.

If "fp" variable is found within the combined dataset,
the "time" values where the "lat", "lon" dimensions didn't match are removed.

NOTE: this is temporary solution while waiting for `.load()` to be added to openghg version of combine_datasets

Args:
dataset_A: First dataset to merge
dataset_B: Second dataset to merge
method: One of None, nearest, ffill, bfill.
See xarray.DataArray.reindex_like for list of options and meaning.
Defaults to ffill (forward fill)
tolerance: Maximum allowed tolerance between matches.

Returns:
xarray.Dataset: Combined dataset indexed to dataset_A
"""
return openghg_combine_datasets(dataset_A, dataset_B.load(), method=method, tolerance=tolerance)


def open_ds(path, chunks=None, combine=None):
"""
Function efficiently opens xarray datasets.
Expand Down Expand Up @@ -297,89 +326,6 @@ def basis_boundary_conditions(domain, basis_case, bc_basis_directory=None):
return basis_ds


def indexesMatch(dsa, dsb):
"""
Check if two datasets need to be reindexed_like for combine_datasets
-----------------------------------
Args:
dsa (xarray.Dataset) :
First dataset to check
dsb (xarray.Dataset) :
Second dataset to check

Returns:
boolean:
True if indexes match, False if datasets must be reindexed
-----------------------------------
"""

commonIndicies = [key for key in dsa.indexes.keys() if key in dsb.indexes.keys()]

# test if each comon index is the same
for index in commonIndicies:
# first check lengths are the same to avoid error in second check
if not len(dsa.indexes[index]) == len(dsb.indexes[index]):
return False

# check number of values that are not close (testing for equality with floating point)
if index == "time":
# for time iverride the default to have ~ second precision
rtol = 1e-10
else:
rtol = 1e-5

num_not_close = np.sum(
~np.isclose(
dsa.indexes[index].values.astype(float),
dsb.indexes[index].values.astype(float),
rtol=rtol,
)
)
if num_not_close > 0:
return False

return True


def combine_datasets(dsa, dsb, method="nearest", tolerance: Optional[float] = None) -> xr.Dataset:
"""
Merge two datasets, re-indexing to the first dataset (within an optional tolerance).

If "fp" variable is found within the combined dataset, the "time" values where the "lat", "lon"
dimensions didn't match are removed.

Example:
ds = combine_datasets(dsa, dsb)

Args:
dsa (xarray.Dataset):
First dataset to merge
dsb (xarray.Dataset):
Second dataset to merge
method: One of {None, ‘nearest’, ‘pad’/’ffill’, ‘backfill’/’bfill’}
See xarray.DataArray.reindex_like for list of options and meaning.
Default = "ffill" (forward fill)
tolerance: Maximum allowed (absolute) tolerance between matches.

Returns:
xarray.Dataset: combined dataset indexed to dsa
"""
# merge the two datasets within a tolerance and remove times that are NaN (i.e. when FPs don't exist)

if not indexesMatch(dsa, dsb):
dsb_temp = dsb.load().reindex_like(dsa, method, tolerance=tolerance)
else:
dsb_temp = dsb

ds_temp = dsa.merge(dsb_temp)

if "fp" in ds_temp:
mask = np.isfinite(ds_temp.fp.sum(dim=["lat", "lon"], skipna=False))
ds_temp = ds_temp.where(mask.as_numpy(), drop=True) # .as_numpy() in case mask is chunked

return ds_temp


def timeseries_HiTRes(
flux_dict,
fp_HiTRes_ds=None,
Expand Down Expand Up @@ -818,7 +764,7 @@ def fp_sensitivity_single_site_basis_func(
)

else:
site_bf = combine_datasets(scenario["fp"].to_dataset(), flux.data)
site_bf = combine_datasets(scenario["fp"].to_dataset(), flux.data, method="nearest")
H_all = site_bf.fp * site_bf.flux

H_all_v = H_all.values.reshape((len(site_bf.lat) * len(site_bf.lon), len(site_bf.time)))
Expand Down
2 changes: 1 addition & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def test_combine_datasets():
fp = get_footprint(site="tac", domain="europe").data
flux = get_flux(species="ch4", source="total-ukghg-edgar7", domain="europe").data

comb = combine_datasets(fp, flux)
comb = combine_datasets(fp, flux, method="nearest")

with pytest.raises(AssertionError) as exc_info:
xr.testing.assert_allclose(flux.flux.squeeze("time").drop_vars("time"), comb.flux.isel(time=0))
Expand Down
Loading