diff --git a/tests/test_volume.py b/tests/test_volume.py new file mode 100644 index 00000000..9897b971 --- /dev/null +++ b/tests/test_volume.py @@ -0,0 +1,159 @@ +import geoutils as gu +import numpy as np +import pandas as pd + +import xdem + +xdem.examples.download_longyearbyen_examples(overwrite=False) + + +class TestLocalHypsometric: + """Test cases for the local hypsometric method.""" + + # Load example data. + dem_2009 = gu.georaster.Raster(xdem.examples.FILEPATHS["longyearbyen_ref_dem"]) + dem_1990 = gu.georaster.Raster(xdem.examples.FILEPATHS["longyearbyen_tba_dem"]).reproject(dem_2009) + outlines = gu.geovector.Vector(xdem.examples.FILEPATHS["longyearbyen_glacier_outlines"]) + # Filter to only look at the Scott Turnerbreen glacier + outlines.ds = outlines.ds.loc[outlines.ds["NAME"] == "Scott Turnerbreen"] + # Create a mask where glacier areas are True + mask = outlines.create_mask(dem_2009) == 255 + + def test_bin_ddem(self): + """Test dDEM binning.""" + ddem = self.dem_2009.data - self.dem_1990.data + + ddem_bins = xdem.volume.hypsometric_binning( + ddem.squeeze()[self.mask], + self.dem_2009.data.squeeze()[self.mask], + bins=50, + kind="fixed") + + ddem_bins_masked = xdem.volume.hypsometric_binning( + np.ma.masked_array(ddem.squeeze(), mask=~self.mask), + np.ma.masked_array(self.dem_2009.data.squeeze(), mask=~self.mask) + ) + + ddem_stds = xdem.volume.hypsometric_binning( + ddem.squeeze()[self.mask], + self.dem_2009.data.squeeze()[self.mask], + aggregation_function=np.std + ) + assert ddem_stds["value"].mean() < 50 + assert np.abs(np.mean(ddem_bins["value"] - ddem_bins_masked["value"])) < 0.01 + + def test_interpolate_ddem_bins(self) -> pd.Series: + """Test dDEM bin interpolation.""" + ddem = self.dem_2009.data - self.dem_1990.data + + ddem_bins = xdem.volume.hypsometric_binning(ddem.squeeze()[self.mask], self.dem_2009.data.squeeze()[self.mask]) + + # Simulate a missing bin + ddem_bins.iloc[3, :] = np.nan + + # Interpolate the bins and exclude bins with low pixel counts from the interpolation. + interpolated_bins = xdem.volume.interpolate_hypsometric_bins(ddem_bins, count_threshold=200) + + assert abs(np.mean(interpolated_bins)) < 40 + assert abs(np.mean(interpolated_bins)) > 0 + assert not np.any(np.isnan(interpolated_bins)) + return interpolated_bins + + def test_area_calculation(self): + """Test the area calculation function.""" + ddem_bins = self.test_interpolate_ddem_bins() + + # Test the area calculation with normal parameters. + bin_area = xdem.volume.calculate_hypsometry_area( + ddem_bins, + self.dem_2009.data.squeeze()[self.mask], + pixel_size=self.dem_2009.res[0] + ) + + # Test area calculation with differing pixel x/y resolution. + # Also test that ddem_bins can be a DataFrame (as long as the column name 'value' exists) + ddem_bins.name = "value" + xdem.volume.calculate_hypsometry_area( + ddem_bins.to_frame(), + self.dem_2009.data.squeeze()[self.mask], + pixel_size=(self.dem_2009.res[0], self.dem_2009.res[0] + 1) + ) + + # Add some nans to the reference DEM + data_with_nans = self.dem_2009.data.squeeze()[self.mask] + data_with_nans[[2, 5]] = np.nan + + # Make sure that the above results in the correct error. + try: + xdem.volume.calculate_hypsometry_area( + ddem_bins, + data_with_nans, + pixel_size=self.dem_2009.res[0] + ) + except AssertionError as exception: + if "DEM has NaNs" not in str(exception): + raise exception + + # Try to pass an incorrect timeframe= parameter + try: + xdem.volume.calculate_hypsometry_area( + ddem_bins, + self.dem_2009.data.squeeze()[self.mask], + pixel_size=self.dem_2009.res[0], + timeframe="blergh" + ) + except ValueError as exception: + if "Argument 'timeframe='blergh'' is invalid" not in str(exception): + raise exception + + # Mess up the dDEM bins and see if it gives the correct error + ddem_bins.iloc[3] = np.nan + try: + xdem.volume.calculate_hypsometry_area( + ddem_bins, + self.dem_2009.data.squeeze()[self.mask], + pixel_size=self.dem_2009.res[0] + ) + except AssertionError as exception: + if "cannot contain NaNs" not in str(exception): + raise exception + + # The area of Scott Turnerbreen was around 3.4 kmĀ² in 1990, so this should be close to that number. + assert 2e6 < bin_area.sum() < 5e6 + + def test_ddem_bin_methods(self): + """Test different dDEM binning methods.""" + ddem = self.dem_2009.data - self.dem_1990.data + + # equal height is already tested in test_bin_ddem + + # Make a fixed amount of bins + equal_count_bins = xdem.volume.hypsometric_binning( + ddem.squeeze()[self.mask], + self.dem_2009.data.squeeze()[self.mask], + bins=50, + kind="count" + ) + assert equal_count_bins.shape[0] == 50 + + # Make 50 bins with approximately the same area (pixel count) + quantile_bins = xdem.volume.hypsometric_binning( + ddem.squeeze()[self.mask], + self.dem_2009.data.squeeze()[self.mask], + bins=50, + kind="quantile" + ) + + assert quantile_bins.shape[0] == 50 + # Make sure that the pixel count variation is low. + assert quantile_bins["count"].std() < 1 + + # Try to feed the previous bins as "custom" bins to the function. + custom_bins = xdem.volume.hypsometric_binning( + ddem.squeeze()[self.mask], + self.dem_2009.data.squeeze()[self.mask], + bins=np.r_[quantile_bins.index.left[0], quantile_bins.index.right], + kind="custom" + ) + + assert custom_bins.shape[0] == quantile_bins.shape[0] diff --git a/xdem/__init__.py b/xdem/__init__.py index e0b79e8f..13962de0 100644 --- a/xdem/__init__.py +++ b/xdem/__init__.py @@ -1,5 +1 @@ -from . import coreg -from . import spatial_tools -from . import dem -from . import examples -from . import spstats +from . import coreg, dem, examples, spatial_tools, volume, spstats diff --git a/xdem/volume.py b/xdem/volume.py new file mode 100644 index 00000000..1782ffa5 --- /dev/null +++ b/xdem/volume.py @@ -0,0 +1,171 @@ +"""Functions to calculate changes in volume (aimed for glaciers).""" +from __future__ import annotations + +from typing import Callable, Optional, Union + +import numpy as np +import pandas as pd +import scipy.interpolate + + +def hypsometric_binning(ddem: np.ndarray, ref_dem: np.ndarray, bins: Union[float, np.ndarray] = 50.0, + kind: str = "fixed", aggregation_function: Callable = np.median) -> pd.DataFrame: + """ + Separate the dDEM in discrete elevation bins. + + It is assumed that the dDEM is calculated as 'ref_dem - dem' (not 'dem - ref_dem'). + + :param ddem: The dDEM as a 2D or 1D array. + :param ref_dem: The reference DEM as a 2D or 1D array. + :param bins: The bin size, count, or array, depending on the binning method ('kind'). + :param kind: The kind of binning to do. Choices: ['fixed', 'count', 'quantile', 'custom']. + :param aggregation_function: The function to aggregate the elevation values within a bin. Defaults to the median. + + :returns: A Pandas DataFrame with elevation bins and dDEM statistics. + """ + assert ddem.shape == ref_dem.shape + + # Remove all nans, and flatten the inputs. + nan_mask = np.logical_or( + np.isnan(ddem) if not isinstance(ddem, np.ma.masked_array) else ddem.mask, + np.isnan(ref_dem) if not isinstance(ref_dem, np.ma.masked_array) else ref_dem.mask + ) + # Extract only the valid values and (if relevant) convert from masked_array to array + ddem = np.array(ddem[~nan_mask]) + ref_dem = np.array(ref_dem[~nan_mask]) + + # If the bin size should be seen as a percentage. + if kind == "fixed": + zbins = np.arange(ref_dem.min(), ref_dem.max() + bins, step=bins) + elif kind == "count": + # Make bins between mean_dem.min() and a little bit above mean_dem.max(). + # The bin count has to be bins + 1 because zbins[0] will be a "below min value" bin, which will be irrelevant. + zbins = np.linspace(ref_dem.min(), ref_dem.max() + 1e-6 / bins, num=int(bins + 1)) + elif kind == "quantile": + # Make the percentile steps. The bins + 1 is explained above. + steps = np.linspace(0, 100, num=int(bins) + 1) + zbins = np.fromiter( + (np.percentile(ref_dem, step) for step in steps), + dtype=float + ) + # The uppermost bin needs to be a tiny amount larger than the highest value to include it. + zbins[-1] += 1e-6 + elif kind == "custom": + zbins = bins + else: + raise ValueError(f"Invalid bin kind: {kind}. Choices: ['fixed', 'count', 'quantile', 'custom']") + + # Generate bins and get bin indices from the mean DEM + indices = np.digitize(ref_dem, bins=zbins) + + # Calculate statistics for each bin. + # If no values exist, all stats should be nans (except count with should be 0) + # medians, means, stds, nmads = (np.zeros(shape=bins.shape[0] - 1, dtype=ddem.dtype) * np.nan, ) * 4 + values = np.zeros(shape=zbins.shape[0] - 1, dtype=ddem.dtype) * np.nan + counts = np.zeros_like(values, dtype=int) + for i in np.arange(indices.min(), indices.max() + 1): + values_in_bin = ddem[indices == i] + # Skip if no values are in the bin. + if values_in_bin.shape[0] == 0: + continue + + values[i - 1] = aggregation_function(values_in_bin) + # medians[i - 1] = np.median(values_in_bin) + # means[i - 1] = np.mean(values_in_bin) + # stds[i - 1] = np.std(values_in_bin) + counts[i - 1] = values_in_bin.shape[0] + # nmads[i - 1] = xdem.spatial_tools.nmad(values_in_bin) + + # Collect the results in a dataframe + output = pd.DataFrame( + index=pd.IntervalIndex.from_breaks(zbins), + data=np.vstack([ + values, counts + ]).T, + columns=["value", "count"] + ) + + return output + + +def interpolate_hypsometric_bins(hypsometric_bins: pd.DataFrame, value_column="value", method="polynomial", order=3, + count_threshold: Optional[int] = None) -> pd.Series: + """ + Interpolate hypsometric bins using any valid Pandas interpolation technique. + + NOTE: It will not extrapolate! + + :param hypsometric_bins: Bins where nans will be interpolated. + :param value_column: The name of the column in 'hypsometric_bins' to use as values. + :param method: Any valid Pandas interpolation technique (e.g. 'linear', 'polynomial', 'ffill', 'bfill'). + :param order: The polynomial order to use. Only used if method='polynomial'. + :param count_threshold: Optional. A pixel count threshold to exclude during the curve fit (requires a 'count' column). + :returns: Bins interpolated with the chosen interpolation method. + """ + bins = hypsometric_bins.copy() + bins.index = bins.index.mid + + if count_threshold is not None: + assert "count" in hypsometric_bins.columns, f"'count' not a column in the dataframe" + bins_under_threshold = bins["count"] < count_threshold + bins.loc[bins_under_threshold, value_column] = np.nan + + interpolated_values = bins[value_column].interpolate(method=method, order=order, limit_direction="both") + + if count_threshold is not None: + interpolated_values.loc[bins_under_threshold] = hypsometric_bins.loc[bins_under_threshold.values, value_column] + interpolated_values.index = hypsometric_bins.index + + return interpolated_values + + +def calculate_hypsometry_area(ddem_bins: Union[pd.Series, pd.DataFrame], ref_dem: np.ndarray, + pixel_size: Union[float, tuple[float, float]], + timeframe: str = "reference") -> pd.Series: + """ + Calculate the associated representative area of the given dDEM bins. + + By default, the area bins will be representative of the mean timing between the reference and nonreference DEM: + elevations = ref_dem - (h_vs_dh_funcion(ref_dem) / 2) + This can be changed to either "reference": + elevations = ref_dem + Or "nonreference": + elevations = ref_dem - h_vs_dh_function(ref_dem) + + :param ddem_bins: A Series or DataFrame of dDEM values. If a DataFrame is given, the column 'value' will be used. + :param ref_dem: The reference DEM. This should not have any NaNs. + :param pixel_size: The xy or (x, y) size of the reference DEM pixels in georeferenced coordinates. + :param timeframe: The time at which the area bins are representative. Choices: "reference", "nonreference", "mean" + + :returns: The representative area within the given dDEM bins. + """ + assert not np.any(np.isnan(ref_dem)), "The given reference DEM has NaNs. No NaNs are allowed to calculate area!" + + if timeframe not in ["reference", "nonreference", "mean"]: + raise ValueError(f"Argument '{timeframe=}' is invalid. Choices: ['reference', 'nonreference', 'mean']") + + if isinstance(ddem_bins, pd.DataFrame): + ddem_bins = ddem_bins["value"] + assert not np.any(np.isnan(ddem_bins.values)), "The dDEM bins cannot contain NaNs. Remove or fill them first." + # Generate a continuous elevation vs. dDEM function + ddem_func = scipy.interpolate.interp1d(ddem_bins.index.mid, ddem_bins.values, + kind="linear", fill_value="extrapolate") + # Generate average elevations by subtracting half of the dDEM's values to the reference DEM + if timeframe == "mean": + elevations = ref_dem - (ddem_func(ref_dem.data) / 2) + elif timeframe == "reference": + elevations = ref_dem + elif timeframe == "nonreference": + elevations = ref_dem - ddem_func(ref_dem.data) + + # Extract the bins from the dDEM series and compute the frequency of points in the bins. + bins = np.r_[[ddem_bins.index.left[0]], ddem_bins.index.right] + bin_counts = np.histogram(elevations, bins=bins)[0] + + # Multiply the bin counts with the pixel area to get the full area. + bin_area = bin_counts * (pixel_size ** 2 if not isinstance(pixel_size, tuple) else pixel_size[0] * pixel_size[1]) + + # Put this in a series which will be returned. + output = pd.Series(index=ddem_bins.index, data=bin_area) + + return output