-
Notifications
You must be signed in to change notification settings - Fork 42
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #36 from erikmannerfelt/local_hypsometric
Local hypsometric interpolation
- Loading branch information
Showing
3 changed files
with
331 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
import geoutils as gu | ||
import numpy as np | ||
import pandas as pd | ||
|
||
import xdem | ||
|
||
xdem.examples.download_longyearbyen_examples(overwrite=False) | ||
|
||
|
||
class TestLocalHypsometric: | ||
"""Test cases for the local hypsometric method.""" | ||
|
||
# Load example data. | ||
dem_2009 = gu.georaster.Raster(xdem.examples.FILEPATHS["longyearbyen_ref_dem"]) | ||
dem_1990 = gu.georaster.Raster(xdem.examples.FILEPATHS["longyearbyen_tba_dem"]).reproject(dem_2009) | ||
outlines = gu.geovector.Vector(xdem.examples.FILEPATHS["longyearbyen_glacier_outlines"]) | ||
# Filter to only look at the Scott Turnerbreen glacier | ||
outlines.ds = outlines.ds.loc[outlines.ds["NAME"] == "Scott Turnerbreen"] | ||
# Create a mask where glacier areas are True | ||
mask = outlines.create_mask(dem_2009) == 255 | ||
|
||
def test_bin_ddem(self): | ||
"""Test dDEM binning.""" | ||
ddem = self.dem_2009.data - self.dem_1990.data | ||
|
||
ddem_bins = xdem.volume.hypsometric_binning( | ||
ddem.squeeze()[self.mask], | ||
self.dem_2009.data.squeeze()[self.mask], | ||
bins=50, | ||
kind="fixed") | ||
|
||
ddem_bins_masked = xdem.volume.hypsometric_binning( | ||
np.ma.masked_array(ddem.squeeze(), mask=~self.mask), | ||
np.ma.masked_array(self.dem_2009.data.squeeze(), mask=~self.mask) | ||
) | ||
|
||
ddem_stds = xdem.volume.hypsometric_binning( | ||
ddem.squeeze()[self.mask], | ||
self.dem_2009.data.squeeze()[self.mask], | ||
aggregation_function=np.std | ||
) | ||
assert ddem_stds["value"].mean() < 50 | ||
assert np.abs(np.mean(ddem_bins["value"] - ddem_bins_masked["value"])) < 0.01 | ||
|
||
def test_interpolate_ddem_bins(self) -> pd.Series: | ||
"""Test dDEM bin interpolation.""" | ||
ddem = self.dem_2009.data - self.dem_1990.data | ||
|
||
ddem_bins = xdem.volume.hypsometric_binning(ddem.squeeze()[self.mask], self.dem_2009.data.squeeze()[self.mask]) | ||
|
||
# Simulate a missing bin | ||
ddem_bins.iloc[3, :] = np.nan | ||
|
||
# Interpolate the bins and exclude bins with low pixel counts from the interpolation. | ||
interpolated_bins = xdem.volume.interpolate_hypsometric_bins(ddem_bins, count_threshold=200) | ||
|
||
assert abs(np.mean(interpolated_bins)) < 40 | ||
assert abs(np.mean(interpolated_bins)) > 0 | ||
assert not np.any(np.isnan(interpolated_bins)) | ||
return interpolated_bins | ||
|
||
def test_area_calculation(self): | ||
"""Test the area calculation function.""" | ||
ddem_bins = self.test_interpolate_ddem_bins() | ||
|
||
# Test the area calculation with normal parameters. | ||
bin_area = xdem.volume.calculate_hypsometry_area( | ||
ddem_bins, | ||
self.dem_2009.data.squeeze()[self.mask], | ||
pixel_size=self.dem_2009.res[0] | ||
) | ||
|
||
# Test area calculation with differing pixel x/y resolution. | ||
# Also test that ddem_bins can be a DataFrame (as long as the column name 'value' exists) | ||
ddem_bins.name = "value" | ||
xdem.volume.calculate_hypsometry_area( | ||
ddem_bins.to_frame(), | ||
self.dem_2009.data.squeeze()[self.mask], | ||
pixel_size=(self.dem_2009.res[0], self.dem_2009.res[0] + 1) | ||
) | ||
|
||
# Add some nans to the reference DEM | ||
data_with_nans = self.dem_2009.data.squeeze()[self.mask] | ||
data_with_nans[[2, 5]] = np.nan | ||
|
||
# Make sure that the above results in the correct error. | ||
try: | ||
xdem.volume.calculate_hypsometry_area( | ||
ddem_bins, | ||
data_with_nans, | ||
pixel_size=self.dem_2009.res[0] | ||
) | ||
except AssertionError as exception: | ||
if "DEM has NaNs" not in str(exception): | ||
raise exception | ||
|
||
# Try to pass an incorrect timeframe= parameter | ||
try: | ||
xdem.volume.calculate_hypsometry_area( | ||
ddem_bins, | ||
self.dem_2009.data.squeeze()[self.mask], | ||
pixel_size=self.dem_2009.res[0], | ||
timeframe="blergh" | ||
) | ||
except ValueError as exception: | ||
if "Argument 'timeframe='blergh'' is invalid" not in str(exception): | ||
raise exception | ||
|
||
# Mess up the dDEM bins and see if it gives the correct error | ||
ddem_bins.iloc[3] = np.nan | ||
try: | ||
xdem.volume.calculate_hypsometry_area( | ||
ddem_bins, | ||
self.dem_2009.data.squeeze()[self.mask], | ||
pixel_size=self.dem_2009.res[0] | ||
) | ||
except AssertionError as exception: | ||
if "cannot contain NaNs" not in str(exception): | ||
raise exception | ||
|
||
# The area of Scott Turnerbreen was around 3.4 km² in 1990, so this should be close to that number. | ||
assert 2e6 < bin_area.sum() < 5e6 | ||
|
||
def test_ddem_bin_methods(self): | ||
"""Test different dDEM binning methods.""" | ||
ddem = self.dem_2009.data - self.dem_1990.data | ||
|
||
# equal height is already tested in test_bin_ddem | ||
|
||
# Make a fixed amount of bins | ||
equal_count_bins = xdem.volume.hypsometric_binning( | ||
ddem.squeeze()[self.mask], | ||
self.dem_2009.data.squeeze()[self.mask], | ||
bins=50, | ||
kind="count" | ||
) | ||
assert equal_count_bins.shape[0] == 50 | ||
|
||
# Make 50 bins with approximately the same area (pixel count) | ||
quantile_bins = xdem.volume.hypsometric_binning( | ||
ddem.squeeze()[self.mask], | ||
self.dem_2009.data.squeeze()[self.mask], | ||
bins=50, | ||
kind="quantile" | ||
) | ||
|
||
assert quantile_bins.shape[0] == 50 | ||
# Make sure that the pixel count variation is low. | ||
assert quantile_bins["count"].std() < 1 | ||
|
||
# Try to feed the previous bins as "custom" bins to the function. | ||
custom_bins = xdem.volume.hypsometric_binning( | ||
ddem.squeeze()[self.mask], | ||
self.dem_2009.data.squeeze()[self.mask], | ||
bins=np.r_[quantile_bins.index.left[0], quantile_bins.index.right], | ||
kind="custom" | ||
) | ||
|
||
assert custom_bins.shape[0] == quantile_bins.shape[0] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1 @@ | ||
from . import coreg | ||
from . import spatial_tools | ||
from . import dem | ||
from . import examples | ||
from . import spstats | ||
from . import coreg, dem, examples, spatial_tools, volume, spstats |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
"""Functions to calculate changes in volume (aimed for glaciers).""" | ||
from __future__ import annotations | ||
|
||
from typing import Callable, Optional, Union | ||
|
||
import numpy as np | ||
import pandas as pd | ||
import scipy.interpolate | ||
|
||
|
||
def hypsometric_binning(ddem: np.ndarray, ref_dem: np.ndarray, bins: Union[float, np.ndarray] = 50.0, | ||
kind: str = "fixed", aggregation_function: Callable = np.median) -> pd.DataFrame: | ||
""" | ||
Separate the dDEM in discrete elevation bins. | ||
It is assumed that the dDEM is calculated as 'ref_dem - dem' (not 'dem - ref_dem'). | ||
:param ddem: The dDEM as a 2D or 1D array. | ||
:param ref_dem: The reference DEM as a 2D or 1D array. | ||
:param bins: The bin size, count, or array, depending on the binning method ('kind'). | ||
:param kind: The kind of binning to do. Choices: ['fixed', 'count', 'quantile', 'custom']. | ||
:param aggregation_function: The function to aggregate the elevation values within a bin. Defaults to the median. | ||
:returns: A Pandas DataFrame with elevation bins and dDEM statistics. | ||
""" | ||
assert ddem.shape == ref_dem.shape | ||
|
||
# Remove all nans, and flatten the inputs. | ||
nan_mask = np.logical_or( | ||
np.isnan(ddem) if not isinstance(ddem, np.ma.masked_array) else ddem.mask, | ||
np.isnan(ref_dem) if not isinstance(ref_dem, np.ma.masked_array) else ref_dem.mask | ||
) | ||
# Extract only the valid values and (if relevant) convert from masked_array to array | ||
ddem = np.array(ddem[~nan_mask]) | ||
ref_dem = np.array(ref_dem[~nan_mask]) | ||
|
||
# If the bin size should be seen as a percentage. | ||
if kind == "fixed": | ||
zbins = np.arange(ref_dem.min(), ref_dem.max() + bins, step=bins) | ||
elif kind == "count": | ||
# Make bins between mean_dem.min() and a little bit above mean_dem.max(). | ||
# The bin count has to be bins + 1 because zbins[0] will be a "below min value" bin, which will be irrelevant. | ||
zbins = np.linspace(ref_dem.min(), ref_dem.max() + 1e-6 / bins, num=int(bins + 1)) | ||
elif kind == "quantile": | ||
# Make the percentile steps. The bins + 1 is explained above. | ||
steps = np.linspace(0, 100, num=int(bins) + 1) | ||
zbins = np.fromiter( | ||
(np.percentile(ref_dem, step) for step in steps), | ||
dtype=float | ||
) | ||
# The uppermost bin needs to be a tiny amount larger than the highest value to include it. | ||
zbins[-1] += 1e-6 | ||
elif kind == "custom": | ||
zbins = bins | ||
else: | ||
raise ValueError(f"Invalid bin kind: {kind}. Choices: ['fixed', 'count', 'quantile', 'custom']") | ||
|
||
# Generate bins and get bin indices from the mean DEM | ||
indices = np.digitize(ref_dem, bins=zbins) | ||
|
||
# Calculate statistics for each bin. | ||
# If no values exist, all stats should be nans (except count with should be 0) | ||
# medians, means, stds, nmads = (np.zeros(shape=bins.shape[0] - 1, dtype=ddem.dtype) * np.nan, ) * 4 | ||
values = np.zeros(shape=zbins.shape[0] - 1, dtype=ddem.dtype) * np.nan | ||
counts = np.zeros_like(values, dtype=int) | ||
for i in np.arange(indices.min(), indices.max() + 1): | ||
values_in_bin = ddem[indices == i] | ||
# Skip if no values are in the bin. | ||
if values_in_bin.shape[0] == 0: | ||
continue | ||
|
||
values[i - 1] = aggregation_function(values_in_bin) | ||
# medians[i - 1] = np.median(values_in_bin) | ||
# means[i - 1] = np.mean(values_in_bin) | ||
# stds[i - 1] = np.std(values_in_bin) | ||
counts[i - 1] = values_in_bin.shape[0] | ||
# nmads[i - 1] = xdem.spatial_tools.nmad(values_in_bin) | ||
|
||
# Collect the results in a dataframe | ||
output = pd.DataFrame( | ||
index=pd.IntervalIndex.from_breaks(zbins), | ||
data=np.vstack([ | ||
values, counts | ||
]).T, | ||
columns=["value", "count"] | ||
) | ||
|
||
return output | ||
|
||
|
||
def interpolate_hypsometric_bins(hypsometric_bins: pd.DataFrame, value_column="value", method="polynomial", order=3, | ||
count_threshold: Optional[int] = None) -> pd.Series: | ||
""" | ||
Interpolate hypsometric bins using any valid Pandas interpolation technique. | ||
NOTE: It will not extrapolate! | ||
:param hypsometric_bins: Bins where nans will be interpolated. | ||
:param value_column: The name of the column in 'hypsometric_bins' to use as values. | ||
:param method: Any valid Pandas interpolation technique (e.g. 'linear', 'polynomial', 'ffill', 'bfill'). | ||
:param order: The polynomial order to use. Only used if method='polynomial'. | ||
:param count_threshold: Optional. A pixel count threshold to exclude during the curve fit (requires a 'count' column). | ||
:returns: Bins interpolated with the chosen interpolation method. | ||
""" | ||
bins = hypsometric_bins.copy() | ||
bins.index = bins.index.mid | ||
|
||
if count_threshold is not None: | ||
assert "count" in hypsometric_bins.columns, f"'count' not a column in the dataframe" | ||
bins_under_threshold = bins["count"] < count_threshold | ||
bins.loc[bins_under_threshold, value_column] = np.nan | ||
|
||
interpolated_values = bins[value_column].interpolate(method=method, order=order, limit_direction="both") | ||
|
||
if count_threshold is not None: | ||
interpolated_values.loc[bins_under_threshold] = hypsometric_bins.loc[bins_under_threshold.values, value_column] | ||
interpolated_values.index = hypsometric_bins.index | ||
|
||
return interpolated_values | ||
|
||
|
||
def calculate_hypsometry_area(ddem_bins: Union[pd.Series, pd.DataFrame], ref_dem: np.ndarray, | ||
pixel_size: Union[float, tuple[float, float]], | ||
timeframe: str = "reference") -> pd.Series: | ||
""" | ||
Calculate the associated representative area of the given dDEM bins. | ||
By default, the area bins will be representative of the mean timing between the reference and nonreference DEM: | ||
elevations = ref_dem - (h_vs_dh_funcion(ref_dem) / 2) | ||
This can be changed to either "reference": | ||
elevations = ref_dem | ||
Or "nonreference": | ||
elevations = ref_dem - h_vs_dh_function(ref_dem) | ||
:param ddem_bins: A Series or DataFrame of dDEM values. If a DataFrame is given, the column 'value' will be used. | ||
:param ref_dem: The reference DEM. This should not have any NaNs. | ||
:param pixel_size: The xy or (x, y) size of the reference DEM pixels in georeferenced coordinates. | ||
:param timeframe: The time at which the area bins are representative. Choices: "reference", "nonreference", "mean" | ||
:returns: The representative area within the given dDEM bins. | ||
""" | ||
assert not np.any(np.isnan(ref_dem)), "The given reference DEM has NaNs. No NaNs are allowed to calculate area!" | ||
|
||
if timeframe not in ["reference", "nonreference", "mean"]: | ||
raise ValueError(f"Argument '{timeframe=}' is invalid. Choices: ['reference', 'nonreference', 'mean']") | ||
|
||
if isinstance(ddem_bins, pd.DataFrame): | ||
ddem_bins = ddem_bins["value"] | ||
assert not np.any(np.isnan(ddem_bins.values)), "The dDEM bins cannot contain NaNs. Remove or fill them first." | ||
# Generate a continuous elevation vs. dDEM function | ||
ddem_func = scipy.interpolate.interp1d(ddem_bins.index.mid, ddem_bins.values, | ||
kind="linear", fill_value="extrapolate") | ||
# Generate average elevations by subtracting half of the dDEM's values to the reference DEM | ||
if timeframe == "mean": | ||
elevations = ref_dem - (ddem_func(ref_dem.data) / 2) | ||
elif timeframe == "reference": | ||
elevations = ref_dem | ||
elif timeframe == "nonreference": | ||
elevations = ref_dem - ddem_func(ref_dem.data) | ||
|
||
# Extract the bins from the dDEM series and compute the frequency of points in the bins. | ||
bins = np.r_[[ddem_bins.index.left[0]], ddem_bins.index.right] | ||
bin_counts = np.histogram(elevations, bins=bins)[0] | ||
|
||
# Multiply the bin counts with the pixel area to get the full area. | ||
bin_area = bin_counts * (pixel_size ** 2 if not isinstance(pixel_size, tuple) else pixel_size[0] * pixel_size[1]) | ||
|
||
# Put this in a series which will be returned. | ||
output = pd.Series(index=ddem_bins.index, data=bin_area) | ||
|
||
return output |