Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Obs error clean up #144

Merged
merged 20 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# OpenGHG Inversions Change Log

# Version 0.2.0

- Fixed "add averaging" functional, which adds the variability of obs over a resampling period to the measurement error (repeatability). This closes [Issue #42](https://github.com/openghg/openghg_inversions/issues/42) . [#PR 144](https://github.com/openghg/openghg_inversions/pull/144)

- Add option to pass the filters as dictionary (with the sites as keys). [#PR 135](https://github.com/openghg/openghg_inversions/pull/135)

- fixed issue with missing obs due to dropping NaNs from other variables in `fp_data` (e.g. `wind_speed`, etc). [#PR 132](https://github.com/openghg/openghg_inversions/pull/132)
Expand Down
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ This is not a comprehensive list (see the docstring for `fixedbasisMCMC` in the
- The default value is `False`.
- If `True`, the arviz `InferenceData` output from sampling will be saved to the output path of the inversion, with a file name of the form `f"{outputname}{start_data}_trace.nc`. To load this trace into arviz, you need to use `InferenceData.from_netcdf`.
- Alternatively, you can pass a path (including filename), and that path will be used.
- `averaging_error`: if `True`, the error from resampling to the given `averaging_period` will be added to the observation's error. (Note: currently this doesn't work correctly, see [GH issue #42](https://github.com/openghg/openghg_inversions/issues/42).)
- `averaging_error`: if `True`, the error from resampling to the given `averaging_period` will be added to the observation's error.
- `use_bc`: defaults to `True`. If `False`, no boundary conditions will be used in the inversion. This implicitly assumes that contributions from the boundary have been subtracted from the observations.
- `fix_basis_outer_regions`:
- Default value is `False`
Expand All @@ -165,6 +165,21 @@ These parameters include:
- the prior modelled enhancement (if `False`)
- `no_model_error`: if `True`, only use obs error in likelihood (omitting min. model error and model error from scaling pollution events).


### The output from inversions

The results of an inversions are returned as an xarray `Dataset`.

The dimension `nmeasure` consists of the time for each observation stacked into a single 1D array.

TODO: complete this part

- `Yerror`: obs. error used in the inversion; if `add_averaging` is True, this will contain the combined "repeatability" and "variability"; otherwise, it will just contain "repeatability", if it is available, or "variability"
- `Yerror_repeatablity`: obs. repeatability. If repeatability isn't available for some sites, then this is filled with zeros.
- `Yerror_variability`: obs. variability.



## Contributing

To contribute to `openghg_inversions`, you should also install the developer packages:
Expand Down
100 changes: 70 additions & 30 deletions openghg_inversions/get_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
This module also includes functions for saving and loading "merged data" created
by the data processing functions.
"""
import logging
import pickle
from collections import defaultdict
from pathlib import Path
Expand All @@ -27,7 +28,71 @@
from openghg.types import SearchError
from openghg.util import timestamp_now

import openghg_inversions.hbmcmc.inversionsetup as setup

logger = logging.getLogger(__name__)


def add_obs_error(sites: list[str], fp_all: dict, add_averaging_error: bool = True) -> None:
"""Create `mf_error` variable that contains either `mf_repeatablility`, `mf_variability`
or the square root of the sum of the squares of both, if `add_averaging_error` is True.

This function modifies `fp_all` in place, adding `mf_error` and making sure that both
`mf_repeatability` and `mf_variability` are present.

Note: if `averaging_period` is specified in `data_processing_surface_notracer`, then OpenGHG
will add an `mf_variability` variable with the standard deviation of the obs over the specified
period. If `mf_variability` is already present (for instance, for Picarro data), then the existing
variable is over-written. If the `averaging_period` matches the frequency of the data, this will
make `mf_variability` zero (since the stdev of one value is 0).

Args:
sites: list of site names to process
fp_all: dictionary of `ModelScenario` objects, keyed by site names
add_averaging_error: if True, combine repeatability and variability to make `mf_error`
variable. Otherwise, `mf_error` will equal `mf_repeatability` if it is present, otherwise
it will equal `mf_variability`.

Returns:
None, modifies `fp_all` in place.
"""
# TODO: do we want to fill missing values in repeatability or variability?
for site in sites:
ds = fp_all[site]

variability_missing = False
if "mf_variability" not in ds:
ds["mf_variability"] = xr.zeros_like(ds.mf)
variability_missing = True

if "mf_repeatability" not in ds:
if variability_missing:
raise ValueError(f"Obs data for site {site} is missing both repeatability and variability.")

ds["mf_repeatability"] = xr.zeros_like(ds.mf_variability)
ds["mf_error"] = ds["mf_variability"]

if add_averaging_error:
logger.info(
"`mf_repeatability` not present; using `mf_variability` for `mf_error` at site %s", site
)

else:
if add_averaging_error:
ds["mf_error"] = np.sqrt(ds["mf_repeatability"] ** 2 + ds["mf_variability"] ** 2)
else:
ds["mf_error"] = ds["mf_repeatability"]

# warnings/info for debugging
err0 = ds["mf_error"] == 0

if err0.any():
percent0 = 100 * err0.mean()
logger.warning("`mf_error` is zero for %.0f percent of times at site %s.", percent0, site)
info_msg = (
"If `averaging_period` matches the frequency of the obs data, then `mf_variability` "
"will be zero. Try setting `averaging_period = None`."
)
logger.info(info_msg)


def data_processing_surface_notracer(
Expand Down Expand Up @@ -130,8 +195,7 @@ def data_processing_surface_notracer(
Optional name used to create merged data name.
"""

for i, site in enumerate(sites):
sites[i] = site.upper()
sites = [site.upper() for site in sites]

# Convert 'None' args to list
nsites = len(sites)
Expand Down Expand Up @@ -279,7 +343,7 @@ def data_processing_surface_notracer(

if species.lower() == "co2":
model_scenario_dict["mf_mod_high_res_" + source] = scenario_sector["mf_mod_high_res"]
elif species.lower() != "co2":
else:
model_scenario_dict["mf_mod_" + source] = scenario_sector["mf_mod"]

scenario_combined = model_scenario.footprints_data_merge(recalculate=True)
Expand Down Expand Up @@ -325,32 +389,8 @@ def data_processing_surface_notracer(
fp_all[".scales"] = scales
fp_all[".units"] = float(scenario_combined.mf.units)

# If site contains measurement errors given as repeatability and variability,
# use variability to replace missing repeatability values, then drop variability
for site in sites:
if "mf_variability" in fp_all[site] and "mf_repeatability" in fp_all[site]:
fp_all[site]["mf_repeatability"][np.isnan(fp_all[site]["mf_repeatability"])] = fp_all[site][
"mf_variability"
][
np.logical_and(
np.isfinite(fp_all[site]["mf_variability"]), np.isnan(fp_all[site]["mf_repeatability"])
)
]
fp_all[site] = fp_all[site].drop_vars("mf_variability")

# Add measurement variability in averaging period to measurement error
if averagingerror:
fp_all = setup.addaveragingerror(
fp_all,
sites,
species,
start_date,
end_date,
averaging_period,
inlet=inlet,
instrument=instrument,
store=obs_store,
)
# create `mf_error`
add_obs_error(sites, fp_all, add_averaging_error=averagingerror)

if save_merged_data:
if merged_data_dir is None:
Expand Down
33 changes: 24 additions & 9 deletions openghg_inversions/hbmcmc/hbmcmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,23 @@ def residual_error_method(ds_dict: dict[str, xr.Dataset], average_over: Optional
# if "bc_mod" is present, we need to add it to "mf_mod"
if all("bc_mod" in v for k, v in ds_dict.items() if not k.startswith(".")):
ds = xr.concat(
[v[["mf", "bc_mod", "mf_mod"]].expand_dims({"site": [k]}) for k, v in ds_dict.items() if not k.startswith(".")],
[
v[["mf", "bc_mod", "mf_mod"]].expand_dims({"site": [k]})
for k, v in ds_dict.items()
if not k.startswith(".")
],
dim="site",
)

scaling_factor = float(ds.mf.units)/float(ds.bc_mod.units)
scaling_factor = float(ds.mf.units) / float(ds.bc_mod.units)
ds["modelled_obs"] = ds.mf_mod + ds.bc_mod / scaling_factor
else:
ds = xr.concat(
[v[["mf", "mf_mod"]].expand_dims({"site": [k]}) for k, v in ds_dict.items() if not k.startswith(".")],
[
v[["mf", "mf_mod"]].expand_dims({"site": [k]})
for k, v in ds_dict.items()
if not k.startswith(".")
],
dim="site",
)
ds["modelled_obs"] = ds.mf_mod
Expand Down Expand Up @@ -495,24 +503,29 @@ def fixedbasisMCMC(
if use_tracer is False:
# Get inputs ready
error = np.zeros(0)
obs_repeatability = np.zeros(0)
obs_variability = np.zeros(0)
Hx = np.zeros(0)
Y = np.zeros(0)
siteindicator = np.zeros(0)

for si, site in enumerate(sites):
# select variables to drop NaNs from
drop_vars = []
for var in ["H", "H_bc", "mf", "mf_variability", "mf_repeatability"]:
for var in ["H", "H_bc", "mf", "mf_error", "mf_variability", "mf_repeatability"]:
if var in fp_data[site].data_vars:
drop_vars.append(var)

# pymc doesn't like NaNs, so drop them for the variables used below
fp_data[site] = fp_data[site].dropna("time", subset=drop_vars)

if "mf_repeatability" in fp_data[site]:
error = np.concatenate((error, fp_data[site].mf_repeatability.values))
if "mf_variability" in fp_data[site]:
error = np.concatenate((error, fp_data[site].mf_variability.values))
# repeatability/variability chosen/combined into mf_error in `get_data.py`
error = np.concatenate((error, fp_data[site].mf_error.values))

# make repeatability and variability for outputs (not used directly in inversions)
obs_repeatability = np.concatenate((obs_repeatability, fp_data[site].mf_repeatability.values))
obs_variability = np.concatenate((obs_variability, fp_data[site].mf_variability.values))


Y = np.concatenate((Y, fp_data[site].mf.values))
siteindicator = np.concatenate((siteindicator, np.ones_like(fp_data[site].mf.values) * si))
Expand Down Expand Up @@ -603,6 +616,8 @@ def fixedbasisMCMC(
"emissions_name": emissions_name,
"emissions_store": emissions_store,
"country_file": country_file,
"obs_repeatability": obs_repeatability,
"obs_variability": obs_variability,
}

# add mcmc_args to post_process_args
Expand Down Expand Up @@ -666,7 +681,7 @@ def isFloat(string):
except ValueError:
return False

ds_in = setup.opends(input_file)
ds_in = xr.load_dataset(input_file)

# Read inputs from ncdf output
start_date = ds_in.attrs["Start date"]
Expand Down
8 changes: 7 additions & 1 deletion openghg_inversions/hbmcmc/inversion_pymc.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ def inferpymc_postprocessouts(
YBCtrace: Optional[np.ndarray] = None,
bcouts: Optional[np.ndarray] = None,
Hbc: Optional[np.ndarray] = None,
obs_repeatability: Optional[np.ndarray] = None,
obs_variability: Optional[np.ndarray] = None,
fp_data=None,
country_file=None,
add_offset=False,
Expand Down Expand Up @@ -745,6 +747,8 @@ def inferpymc_postprocessouts(
data_vars = {
"Yobs": (["nmeasure"], Y),
"Yerror": (["nmeasure"], error),
"Yerror_repeatability": (["nmeasure"], obs_repeatability),
"Yerror_variability": (["nmeasure"], obs_variability),
"Ytime": (["nmeasure"], Ytime),
"Yapriori": (["nmeasure"], Yapriori),
"Ymodmean": (["nmeasure"], Ymodmu),
Expand Down Expand Up @@ -813,6 +817,9 @@ def inferpymc_postprocessouts(
outds.fluxmode.attrs["units"] = "mol/m2/s"
outds.fluxapriori.attrs["units"] = "mol/m2/s"
outds.Yobs.attrs["units"] = obs_units + " " + "mol/mol"
outds.Yerror.attrs["units"] = obs_units + " " + "mol/mol"
outds.Yerror_repeatability.attrs["units"] = obs_units + " " + "mol/mol"
outds.Yerror_variability.attrs["units"] = obs_units + " " + "mol/mol"
outds.Yapriori.attrs["units"] = obs_units + " " + "mol/mol"
outds.Ymodmean.attrs["units"] = obs_units + " " + "mol/mol"
outds.Ymodmedian.attrs["units"] = obs_units + " " + "mol/mol"
Expand All @@ -824,7 +831,6 @@ def inferpymc_postprocessouts(
outds.Yoffmode.attrs["units"] = obs_units + " " + "mol/mol"
outds.Yoff95.attrs["units"] = obs_units + " " + "mol/mol"
outds.Yoff68.attrs["units"] = obs_units + " " + "mol/mol"
outds.Yerror.attrs["units"] = obs_units + " " + "mol/mol"
outds.countrymean.attrs["units"] = country_units
outds.countrymedian.attrs["units"] = country_units
outds.countrymode.attrs["units"] = country_units
Expand Down
93 changes: 0 additions & 93 deletions openghg_inversions/hbmcmc/inversionsetup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,99 +12,6 @@

import numpy as np
import pandas as pd
import xarray as xr
from openghg.retrieve import get_obs_surface


def opends(fn):
"""
Open a netcdf dataset with xarray
-----------------------------------
Args:
fn (str):
Netcdf file to be opened

Returns:
xarray.Dataset:
netcdf file as dataset
-----------------------------------
"""
with xr.open_dataset(fn) as load:
ds = load.load()

return ds


def addaveragingerror(
fp_all, sites, species, start_date, end_date, meas_period, inlet=None, instrument=None, store=None
):
"""
Adds the variablility within the averaging period to the mole fraction error.
-----------------------------------
Args:
fp_all (dict):
Output from ModelScenario
sites (list):
List of site names
species (str):
Species of interest
start_date (str):
Start time of inversion "YYYY-mm-dd"
end_date (str):
End time of inversion "YYYY-mm-dd"
meas_period (list):
Averaging period of measurements
inlet (list, optional):
Specific inlet height for the site (must match number of sites).
instrument (list):
Specific instrument for the site (must match number of sites).

Returns:
fp_all (dict):
fp_all from input with averaging error added to the mole fraction
error
-----------------------------------
"""
# Add variability in measurement averaging period to repeatability
for i, site in enumerate(sites):
get_obs = get_obs_surface(
site=site,
species=species,
inlet=inlet[i],
instrument=instrument[i],
average=meas_period[i],
start_date=start_date,
end_date=end_date,
store=store,
)

sitedataerr = pd.DataFrame(get_obs.data.mf, index=get_obs.data.time.values)

if min(sitedataerr.index) > pd.to_datetime(start_date):
sitedataerr.loc[pd.to_datetime(start_date)] = [np.nan for col in sitedataerr.columns]
# Pad with an empty entry at the end date
if max(sitedataerr.index) < pd.to_datetime(end_date):
sitedataerr.loc[pd.to_datetime(end_date)] = [np.nan for col in sitedataerr.columns]
# Now sort to get everything in the right order
sitedataerr = sitedataerr.sort_index()
if "mf_variability" in fp_all[site.upper()]:
fp_all[site.upper()].mf_variability.values = np.squeeze(
np.sqrt(
fp_all[site.upper()].mf_variability.values ** 2
+ sitedataerr.resample(meas_period[i]).std(ddof=0).dropna().values.T ** 2
)
)
elif "mf_repeatability" in fp_all[site]:
fp_all[site.upper()].mf_repeatability.values = np.squeeze(
np.sqrt(
fp_all[site.upper()].mf_repeatability.values ** 2
+ sitedataerr.resample(meas_period[i]).std(ddof=0).dropna().values.T ** 2
)
)
else:
print("No mole fraction error information available in {}.".format("fp_all" + str([site])))

return fp_all


def monthly_bcs(start_date, end_date, site, fp_data):
Expand Down
Loading
Loading