diff --git a/docs/source/code/coregistration_plot_nuth_kaab.py b/docs/source/code/coregistration_plot_nuth_kaab.py index 3fa21369..7fee16a1 100644 --- a/docs/source/code/coregistration_plot_nuth_kaab.py +++ b/docs/source/code/coregistration_plot_nuth_kaab.py @@ -17,8 +17,8 @@ ddem_pre = (dem_2009.data - dem_1990.data).filled(np.nan).squeeze() ddem_post = (dem_2009.data - dem_coreg).filled(np.nan).squeeze() -nmad_pre = xdem.spatial_tools.nmad(ddem_pre[inlier_mask.squeeze()]) -nmad_post = xdem.spatial_tools.nmad(ddem_post[inlier_mask.squeeze()]) +nmad_pre = xdem.spatialstats.nmad(ddem_pre[inlier_mask.squeeze()]) +nmad_post = xdem.spatialstats.nmad(ddem_post[inlier_mask.squeeze()]) vlim = 20 plt.figure(figsize=(8, 5)) diff --git a/examples/plot_blockwise_coreg.py b/examples/plot_blockwise_coreg.py index 9939a37b..6938e4d1 100644 --- a/examples/plot_blockwise_coreg.py +++ b/examples/plot_blockwise_coreg.py @@ -99,5 +99,5 @@ # %% # We can compare the :ref:`spatial_stats_nmad` to validate numerically that there was an improvment: -print(f"Error before: {xdem.spatial_tools.nmad(diff_before):.2f} m") -print(f"Error after: {xdem.spatial_tools.nmad(diff_after):.2f} m") +print(f"Error before: {xdem.spatialstats.nmad(diff_before):.2f} m") +print(f"Error after: {xdem.spatialstats.nmad(diff_after):.2f} m") diff --git a/examples/plot_norm_regional_hypso.py b/examples/plot_norm_regional_hypso.py index b20087e7..891a70ad 100644 --- a/examples/plot_norm_regional_hypso.py +++ b/examples/plot_norm_regional_hypso.py @@ -108,7 +108,7 @@ difference = (ddem_filled - ddem).squeeze()[random_nans].filled(np.nan) median = np.nanmedian(difference) -nmad = xdem.spatial_tools.nmad(difference) +nmad = xdem.spatialstats.nmad(difference) plt.title(f"Median: {median:.2f} m, NMAD: {nmad:.2f} m") plt.hist(difference, bins=np.linspace(-15, 15, 100)) diff --git a/examples/plot_nuth_kaab.py b/examples/plot_nuth_kaab.py index e6130da9..a576b3b2 100644 --- a/examples/plot_nuth_kaab.py +++ b/examples/plot_nuth_kaab.py @@ -63,8 +63,8 @@ # %% # We can compare the :ref:`spatial_stats_nmad` to validate numerically that there was an improvment: -print(f"Error before: {xdem.spatial_tools.nmad(diff_before):.2f} m") -print(f"Error after: {xdem.spatial_tools.nmad(diff_after):.2f} m") +print(f"Error before: {xdem.spatialstats.nmad(diff_before):.2f} m") +print(f"Error after: {xdem.spatialstats.nmad(diff_after):.2f} m") # %% # In the plot above, one may notice a positive (blue) tendency toward the east. diff --git a/tests/test_coreg.py b/tests/test_coreg.py index a8cf7cbb..e646cd59 100644 --- a/tests/test_coreg.py +++ b/tests/test_coreg.py @@ -17,7 +17,7 @@ with warnings.catch_warnings(): warnings.simplefilter("ignore") - from xdem import coreg, examples, spatial_tools, misc + from xdem import coreg, examples, spatial_tools, spatialstats, misc import xdem @@ -698,7 +698,7 @@ def test_apply_matrix(): # Check that the median is very close to zero assert np.abs(np.nanmedian(diff)) < 0.01 # Check that the NMAD is low - assert spatial_tools.nmad(diff) < 0.01 + assert spatialstats.nmad(diff) < 0.01 def rotation_matrix(rotation=30): rotation = np.deg2rad(rotation) @@ -721,7 +721,7 @@ def rotation_matrix(rotation=30): ) # Make sure that the rotated DEM is way off, but is centered around the same approximate point. assert np.abs(np.nanmedian(rotated_dem - ref.data.data)) < 1 - assert spatial_tools.nmad(rotated_dem - ref.data.data) > 500 + assert spatialstats.nmad(rotated_dem - ref.data.data) > 500 # Apply a rotation in the opposite direction unrotated_dem = coreg.apply_matrix( @@ -775,8 +775,8 @@ def rotation_matrix(rotation=30): # Check that the median is very close to zero assert np.abs(np.nanmedian(diff)) < 0.5 # Check that the NMAD is low - assert spatial_tools.nmad(diff) < 5 - print(np.nanmedian(diff), spatial_tools.nmad(diff)) + assert spatialstats.nmad(diff) < 5 + print(np.nanmedian(diff), spatialstats.nmad(diff)) def test_warp_dem(): @@ -874,7 +874,7 @@ def test_warp_dem(): ) # Validate that the DEM is now more or less the same as the original. # Due to the randomness, the threshold is quite high, but would be something like 10+ if it was incorrect. - assert spatial_tools.nmad(dem - untransformed_dem) < 0.5 + assert spatialstats.nmad(dem - untransformed_dem) < 0.5 if False: import matplotlib.pyplot as plt diff --git a/tests/test_fit.py b/tests/test_fit.py new file mode 100644 index 00000000..9dc28aed --- /dev/null +++ b/tests/test_fit.py @@ -0,0 +1,143 @@ +""" +Functions to test the fitting tools. +""" +import numpy as np +import pandas as pd +import pytest + +import xdem + +from sklearn.metrics import mean_squared_error, median_absolute_error + +class TestRobustFitting: + + @pytest.mark.parametrize("pkg_estimator", [('sklearn','Linear'), ('scipy','Linear'), ('sklearn','Theil-Sen'), + ('sklearn','RANSAC'),('sklearn','Huber')]) + def test_robust_polynomial_fit(self, pkg_estimator: str): + + # Define x vector + x = np.linspace(1, 10, 1000) + # Define exact polynomial + true_coefs = [-100, 5, 3, 2] + y = np.polyval(np.flip(true_coefs), x) + + # Run fit + coefs, deg = xdem.fit.robust_polynomial_fit(x, y, linear_pkg=pkg_estimator[0], estimator_name=pkg_estimator[1], random_state=42) + + # Check coefficients are constrained + assert deg == 3 or deg == 4 + error_margins = [100, 5, 2, 1] + for i in range(4): + assert coefs[i] == pytest.approx(true_coefs[i], abs=error_margins[i]) + + def test_robust_polynomial_fit_noise_and_outliers(self): + + np.random.seed(42) + + # Define x vector + x = np.linspace(1,10,1000) + # Define an exact polynomial + true_coefs = [-100, 5, 3, 2] + y = np.polyval(np.flip(true_coefs), x) + # Add some noise on top + y += np.random.normal(loc=0, scale=3, size=1000) + # Add some outliers + y[50:75] = 0 + y[900:925] = 1000 + + # Run with the "Linear" estimator + coefs, deg = xdem.fit.robust_polynomial_fit(x,y, estimator_name='Linear', linear_pkg='scipy', + loss='soft_l1', f_scale=0.5) + + # Scipy solution should be quite robust to outliers/noise (with the soft_l1 method and f_scale parameter) + # However, it is subject to random processes inside the scipy function (couldn't find how to fix those...) + # It can find a degree 3, or 4 with coefficient close to 0 + assert deg in [3, 4] + acceptable_scipy_linear_margins = [3, 3, 1, 1] + for i in range(4): + assert coefs[i] == pytest.approx(true_coefs[i], abs=acceptable_scipy_linear_margins[i]) + + # The sklearn Linear solution with MSE cost function will not be robust + coefs2, deg2 = xdem.fit.robust_polynomial_fit(x, y, estimator_name='Linear', linear_pkg='sklearn', + cost_func=mean_squared_error, margin_improvement=50) + # It won't find the right degree because of the outliers and noise + assert deg2 != 3 + # Using the median absolute error should improve the fit + coefs3, deg3 = xdem.fit.robust_polynomial_fit(x, y, estimator_name='Linear', linear_pkg='sklearn', + cost_func=median_absolute_error, margin_improvement=50) + # Will find the right degree, but won't find the right coefficients because of the outliers and noise + assert deg3 == 3 + sklearn_linear_error = [50, 10, 5, 0.5] + for i in range(4): + assert np.abs(coefs3[i] - true_coefs[i]) > sklearn_linear_error[i] + + # Now, the robust estimators + # Theil-Sen should have better coefficients + coefs4, deg4 = xdem.fit.robust_polynomial_fit(x, y, estimator_name='Theil-Sen', random_state=42) + assert deg4 == 3 + # High degree coefficients should be well constrained + assert coefs4[2] == pytest.approx(true_coefs[2], abs=1) + assert coefs4[3] == pytest.approx(true_coefs[3], abs=1) + + # RANSAC is not always optimal, here it does not work well + coefs5, deg5 = xdem.fit.robust_polynomial_fit(x, y, estimator_name='RANSAC', random_state=42) + assert deg5 != 3 + + # Huber should perform well, close to the scipy robust solution + coefs6, deg6 = xdem.fit.robust_polynomial_fit(x, y, estimator_name='Huber') + assert deg6 == 3 + for i in range(3): + assert coefs6[i+1] == pytest.approx(true_coefs[i+1], abs=1) + + @pytest.mark.skip('This test randomly fails in CI: issue opened.') + def test_robust_sumsin_fit(self): + + # Define X vector + x = np.linspace(0, 10, 1000) + # Define exact sum of sinusoid signal + true_coefs = np.array([(5, 1, np.pi),(3, 0.3, 0)]).flatten() + y = xdem.fit._sumofsinval(x, params=true_coefs) + + # Check that the function runs + coefs, deg = xdem.fit.robust_sumsin_fit(x, y, random_state=42) + + # Check that the estimated sum of sinusoid correspond to the input + for i in range(2): + assert coefs[3*i] == pytest.approx(true_coefs[3*i], abs=0.02) + + # Check that using custom arguments does not trigger an error + bounds = [(3,7),(0.1,3),(0,2*np.pi),(1,7),(0.1,1),(0,2*np.pi),(0,1),(0.1,1),(0,2*np.pi)] + coefs, deg = xdem.fit.robust_sumsin_fit(x, y, bounds_amp_freq_phase=bounds, nb_frequency_max=2, + hop_length=0.01, random_state=42) + + def test_robust_simsin_fit_noise_and_outliers(self): + + # Check robustness to outliers + np.random.seed(42) + # Define X vector + x = np.linspace(0, 10, 1000) + # Define exact sum of sinusoid signal + true_coefs = np.array([(5, 1, np.pi), (3, 0.3, 0)]).flatten() + y = xdem.fit._sumofsinval(x, params=true_coefs) + + # Add some noise + y += np.random.normal(loc=0, scale=0.25, size=1000) + # Add some outliers + y[50:75] = -10 + y[900:925] = 10 + + # Define first guess for bounds and run + bounds = [(3, 7), (0.1, 3), (0, 2 * np.pi), (1, 7), (0.1, 1), (0, 2 * np.pi), (0, 1), (0.1, 1), (0, 2 * np.pi)] + coefs, deg = xdem.fit.robust_sumsin_fit(x, y, random_state=42, bounds_amp_freq_phase=bounds) + + # Should be less precise, but still on point + # We need to re-order output coefficient to match input + if coefs[3] > coefs[0]: + coefs = np.concatenate((coefs[3:],coefs[0:3])) + + # Check values + for i in range(2): + assert coefs[3*i] == pytest.approx(true_coefs[3*i], abs=0.2) + assert coefs[3 * i +1] == pytest.approx(true_coefs[3 * i +1], abs=0.2) + error_phase = min(np.abs(coefs[3 * i + 2] - true_coefs[ 3* i + 2]), np.abs(2* np.pi - (coefs[3 * i + 2] - true_coefs[3* i + 2]))) + assert error_phase < 0.2 diff --git a/tests/test_spatial_tools.py b/tests/test_spatial_tools.py index be8638cc..27ad04bf 100644 --- a/tests/test_spatial_tools.py +++ b/tests/test_spatial_tools.py @@ -2,6 +2,7 @@ Functions to test the spatial tools. """ from __future__ import annotations + import os import shutil import subprocess @@ -27,7 +28,6 @@ def test_dem_subtraction(): assert np.nanmean(np.abs(diff.data)) < 100 - class TestMerging: """ Test cases for stacking and merging DEMs @@ -211,7 +211,6 @@ def test_get_array_and_mask( else: assert not np.shares_memory(array, arr_view) - class TestSubsample: """ Different examples of 1D to 3D arrays with masked values for testing. @@ -265,4 +264,4 @@ def test_subsample(self, array): assert np.ndim(indices) == 2 assert len(indices) == np.ndim(array) assert np.ndim(array[indices]) == 1 - assert np.size(array[indices]) == int(np.size(array) * 0.3) + assert np.size(array[indices]) == int(np.size(array) * 0.3) \ No newline at end of file diff --git a/tests/test_terrain.py b/tests/test_terrain.py index c28307d1..3eddb826 100644 --- a/tests/test_terrain.py +++ b/tests/test_terrain.py @@ -66,7 +66,7 @@ def test_attribute_functions(self, attribute: str) -> None: diff = (attr_xdem - attr_gdal).filled(np.nan) try: assert np.nanmean(diff) < 5 - assert xdem.spatial_tools.nmad(diff) < 5 + assert xdem.spatialstats.nmad(diff) < 5 except Exception as exception: if PLOT: diff --git a/xdem/__init__.py b/xdem/__init__.py index 4249d117..db1aeb21 100644 --- a/xdem/__init__.py +++ b/xdem/__init__.py @@ -1,4 +1,4 @@ -from . import coreg, dem, examples, spatial_tools, spatialstats, volume, filters, terrain +from . import coreg, dem, examples, spatial_tools, spatialstats, volume, filters, fit, terrain from .ddem import dDEM from .dem import DEM from .demcollection import DEMCollection diff --git a/xdem/coreg.py b/xdem/coreg.py index 02614465..774d5aef 100644 --- a/xdem/coreg.py +++ b/xdem/coreg.py @@ -354,7 +354,7 @@ def ramp(x_coordinates: np.ndarray, y_coordinates: np.ndarray) -> np.ndarray: if metadata is not None: metadata["deramp"] = { "coefficients": coefficients, - "nmad": xdem.spatial_tools.nmad(residuals(coefficients, valid_diffs, valid_x_coords, valid_y_coords, degree)) + "nmad": xdem.spatialstats.nmad(residuals(coefficients, valid_diffs, valid_x_coords, valid_y_coords, degree)) } # Return the function which can be used later. @@ -744,7 +744,7 @@ def error(self, reference_dem: Union[np.ndarray, np.ma.masked_array], inlier_mask=inlier_mask, transform=transform) error_functions = { - "nmad": xdem.spatial_tools.nmad, + "nmad": xdem.spatialstats.nmad, "median": np.median, "mean": np.mean, "std": np.std, @@ -1246,7 +1246,7 @@ def _fit_func(self, ref_dem: np.ndarray, tba_dem: np.ndarray, transform: Optiona # Calculate initial dDEM statistics elevation_difference = ref_dem - aligned_dem bias = np.nanmedian(elevation_difference) - nmad_old = xdem.spatial_tools.nmad(elevation_difference) + nmad_old = xdem.spatialstats.nmad(elevation_difference) if verbose: print(" Statistics on initial dh:") print(" Median = {:.2f} - NMAD = {:.2f}".format(bias, nmad_old)) @@ -1291,7 +1291,7 @@ def _fit_func(self, ref_dem: np.ndarray, tba_dem: np.ndarray, transform: Optiona # Update statistics elevation_difference = ref_dem - aligned_dem bias = np.nanmedian(elevation_difference) - nmad_new = xdem.spatial_tools.nmad(elevation_difference) + nmad_new = xdem.spatialstats.nmad(elevation_difference) nmad_gain = (nmad_new - nmad_old) / nmad_old*100 if verbose: diff --git a/xdem/fit.py b/xdem/fit.py new file mode 100644 index 00000000..493983c1 --- /dev/null +++ b/xdem/fit.py @@ -0,0 +1,402 @@ +""" +Functions to perform normal, weighted and robust fitting. +""" +from __future__ import annotations + +import inspect +from typing import Callable, Union, Sized, Optional +import warnings + +import numpy as np +import pandas as pd +import scipy.optimize + +from xdem.spatialstats import nd_binning +from xdem.spatial_tools import subsample_raster + +try: + from sklearn.metrics import mean_squared_error, median_absolute_error + from sklearn.linear_model import ( + LinearRegression, TheilSenRegressor, RANSACRegressor, HuberRegressor) + from sklearn.pipeline import make_pipeline + from sklearn.preprocessing import PolynomialFeatures, RobustScaler + _has_sklearn = True +except ImportError: + _has_sklearn = False + +def rmse(z: np.ndarray) -> float: + """ + Return root mean square error + :param z: Residuals between predicted and true value + :return: Root Mean Square Error + """ + return np.sqrt(np.nanmean(np.square(z))) + +def huber_loss(z: np.ndarray) -> float: + """ + Huber loss cost (reduces the weight of outliers) + :param z: Residuals between predicted and true values + :return: Huber cost + """ + out = np.where(z > 1, 2 * np.sqrt(z[np.where(z > 1)]) - 1, np.square(z)) + + return out.sum() + +def soft_loss(z: np.ndarray, scale = 0.5) -> float: + """ + Soft loss cost (reduces the weight of outliers) + :param z: Residuals between predicted and true values + :param scale: Scale factor + :return: Soft loss cost + """ + return np.sum(np.square(scale) * 2 * (np.sqrt(1 + np.square(z/scale)) - 1)) + +def _costfun_sumofsin(p, x, y, cost_func): + """ + Calculate robust cost function for sum of sinusoids + """ + z = y - _sumofsinval(x, p) + return cost_func(z) + + +def _choice_best_order(cost: np.ndarray, margin_improvement : float = 20., verbose: bool = False) -> int: + """ + Choice of the best order (polynomial, sum of sinusoids) with a margin of improvement. The best cost value does + not necessarily mean the best predictive fit because high-degree polynomials tend to overfit, and sum of sinusoids + as well. To mitigate this issue, we should choose the lesser order from which improvement becomes negligible. + :param cost: cost function residuals to the polynomial + :param margin_improvement: improvement margin (percentage) below which the lesser degree polynomial is kept + :param verbose: if text should be printed + + :return: degree: degree for the best-fit polynomial + """ + + # get percentage of spread from the minimal cost + ind_min = cost.argmin() + min_cost = cost[ind_min] + perc_cost_improv = (cost - min_cost) / min_cost + + # costs below threshold and lesser degrees + below_margin = np.logical_and(perc_cost_improv < margin_improvement / 100., np.arange(len(cost))<=ind_min) + costs_below_thresh = cost[below_margin] + # minimal costs + subindex = costs_below_thresh.argmin() + # corresponding index (degree) + ind = np.arange(len(cost))[below_margin][subindex] + + if verbose: + print('Order '+str(ind_min+1)+ ' has the minimum cost value of '+str(min_cost)) + print('Order '+str(ind+1)+ ' is selected within a '+str(margin_improvement)+' % margin of' + ' the minimum cost, with a cost value of '+str(min_cost)) + + return ind + +def _wrapper_scipy_leastsquares(residual_func, p0, x, y, verbose, **kwargs): + """ + Wrapper function for scipy.optimize.least_squares: passes down keyword, extracts cost and final parameters, print + statements in the console + + :param residual_func: Residual function to fit + :param p0: Initial guess + :param x: X vector + :param y: Y vector + :param verbose: Whether to print out statements + :return: + """ + + # Get arguments of scipy.optimize + fun_args = scipy.optimize.least_squares.__code__.co_varnames[:scipy.optimize.least_squares.__code__.co_argcount] + # Check no other argument is left to be passed + remaining_kwargs = kwargs.copy() + for arg in fun_args: + remaining_kwargs.pop(arg, None) + if len(remaining_kwargs) != 0: + warnings.warn('Keyword arguments: ' + ','.join(list(remaining_kwargs.keys())) + ' were not used.') + # Filter corresponding arguments before passing + filtered_kwargs = {k: kwargs[k] for k in fun_args if k in kwargs} + + # Run function with associated keyword arguments + myresults = scipy.optimize.least_squares(residual_func, p0, args=(x, y), **filtered_kwargs) + if verbose: + print("Initial Parameters: ", p0) + print("Status: ", myresults.success, " - ", myresults.status) + print(myresults.message) + print("Lowest cost:", myresults.cost) + print("Parameters:", myresults.x) + cost = myresults.cost + coefs = myresults.x + + return cost, coefs + +def _wrapper_sklearn_robustlinear(model, estimator_name, cost_func, x, y, **kwargs): + """ + Wrapper function of sklearn.linear_models: passes down keyword, extracts cost and final parameters, sets random + states, scales input and de-scales output data, prints out statements + + :param model: Function model to fit (e.g., Polynomial features) + :param estimator_name: Linear estimator to use (one of "Linear", "Theil-Sen", "RANSAC" and "Huber") + :param cost_func: Cost function to use for optimization + :param x: X vector + :param y: Y vector + :return: + """ + # Select sklearn estimator + dict_estimators = {'Linear': LinearRegression, 'Theil-Sen': TheilSenRegressor, + 'RANSAC': RANSACRegressor, 'Huber': HuberRegressor} + + est = dict_estimators[estimator_name] + + # Get existing arguments of the sklearn estimator and model + estimator_args = list(inspect.signature(est.__init__).parameters.keys()) + + # Check no other argument is left to be passed + remaining_kwargs = kwargs.copy() + for arg in estimator_args: + remaining_kwargs.pop(arg, None) + if len(remaining_kwargs) != 0: + warnings.warn('Keyword arguments: ' + ','.join(list(remaining_kwargs.keys())) + ' were not used.') + # Filter corresponding arguments before passing + filtered_kwargs = {k: kwargs[k] for k in estimator_args if k in kwargs} + + # TODO: Find out how to re-scale polynomial coefficient + doc on what is the best scaling for polynomials + # # Scale output data (important for ML algorithms): + # robust_scaler = RobustScaler().fit(x.reshape(-1,1)) + # x_scaled = robust_scaler.transform(x.reshape(-1,1)) + # # Fit scaled data + # model.fit(x_scaled, y) + # y_pred = model.predict(x_scaled) + + # Initialize estimator with arguments + init_estimator = est(**filtered_kwargs) + + # Create pipeline + pipeline = make_pipeline(model, init_estimator) + + # Run with data + pipeline.fit(x.reshape(-1, 1), y) + y_pred = pipeline.predict(x.reshape(-1, 1)) + + # Calculate cost + cost = cost_func(y_pred, y) + + # Get polynomial coefficients estimated with the estimators Linear, Theil-Sen and Huber + if estimator_name in ['Linear','Theil-Sen','Huber']: + coefs = init_estimator.coef_ + # For some reason RANSAC doesn't store coef at the same place + else: + coefs = init_estimator.estimator_.coef_ + + return cost, coefs + + +def robust_polynomial_fit(x: np.ndarray, y: np.ndarray, max_order: int = 6, estimator_name: str = 'Theil-Sen', + cost_func: Callable = median_absolute_error, margin_improvement : float = 20., + subsample: Union[float,int] = 25000, linear_pkg = 'sklearn', verbose: bool = False, + random_state: None | np.random.RandomState | np.random.Generator | int = None, **kwargs) -> tuple[np.ndarray,int]: + """ + Given 1D vectors x and y, compute a robust polynomial fit to the data. Order is chosen automatically by comparing + residuals for multiple fit orders of a given estimator. + Any keyword argument will be passed down to scipy.optimize.least_squares and sklearn linear estimators. + + :param x: input x data (N,) + :param y: input y data (N,) + :param max_order: maximum polynomial order tried for the fit + :param estimator_name: robust estimator to use, one of 'Linear', 'Theil-Sen', 'RANSAC' or 'Huber' + :param cost_func: cost function taking as input two vectors y (true y), y' (predicted y) of same length + :param margin_improvement: improvement margin (percentage) below which the lesser degree polynomial is kept + :param subsample: If <= 1, will be considered a fraction of valid pixels to extract. + If > 1 will be considered the number of pixels to extract. + :param linear_pkg: package to use for Linear estimator, one of 'scipy' and 'sklearn' + :param random_state: random seed for testing purposes + :param verbose: if text should be printed + + :returns coefs, degree: polynomial coefficients and degree for the best-fit polynomial + """ + if not isinstance(estimator_name, str) or estimator_name not in ['Linear','Theil-Sen','RANSAC','Huber']: + raise ValueError('Attribute estimator must be one of "Linear", "Theil-Sen", "RANSAC" or "Huber".') + if not isinstance(linear_pkg, str) or linear_pkg not in ['sklearn','scipy']: + raise ValueError('Attribute linear_pkg must be one of "scipy" or "sklearn".') + + # Remove NaNs + valid_data = np.logical_and(np.isfinite(y), np.isfinite(x)) + x = x[valid_data] + y = y[valid_data] + + # Subsample data + subsamp = subsample_raster(x, subsample=subsample, return_indices=True, random_state=random_state) + x = x[subsamp] + y = y[subsamp] + + # Initialize cost function and output coefficients + list_costs = np.empty(max_order) + list_coeffs = np.zeros((max_order, max_order + 1)) + # Loop on polynomial degrees + for deg in np.arange(1, max_order + 1): + # If method is linear and package scipy + if estimator_name == 'Linear' and linear_pkg == 'scipy': + + # Define the residual function to optimize with scipy + def fitfun_polynomial(xx, params): + return sum([p * (xx ** i) for i, p in enumerate(params)]) + def residual_func(p, xx, yy): + return fitfun_polynomial(xx, p) - yy + # Define the initial guess + p0 = np.polyfit(x, y, deg) + + # Run the linear method with scipy + cost, coef = _wrapper_scipy_leastsquares(residual_func, p0, x, y, verbose=verbose, **kwargs) + + else: + # Otherwise, we use sklearn + if not _has_sklearn: + raise ValueError("Optional dependency needed. Install 'scikit-learn'") + + # Define the polynomial model to insert in the pipeline + model = PolynomialFeatures(degree=deg) + + # Run the linear method with sklearn + cost, coef = _wrapper_sklearn_robustlinear(model, estimator_name=estimator_name, cost_func=cost_func, + x=x, y=y, **kwargs) + + list_costs[deg - 1] = cost + list_coeffs[deg - 1, 0:coef.size] = coef + + # Choose the best polynomial with a margin of improvement on the cost + final_index = _choice_best_order(cost=list_costs, margin_improvement=margin_improvement, verbose=verbose) + + # The degree of the best polynomial corresponds to the index plus one + return np.trim_zeros(list_coeffs[final_index], 'b'), final_index + 1 + + +def _sumofsinval(x: np.array, params: np.ndarray) -> np.ndarray: + """ + Function for a sum of N frequency sinusoids + :param x: array of coordinates (N,) + :param p: list of tuples with amplitude, frequency and phase parameters + """ + aix = np.arange(0, params.size, 3) + bix = np.arange(1, params.size, 3) + cix = np.arange(2, params.size, 3) + + val = np.sum(params[aix] * np.sin(2 * np.pi / params[bix] * x[:, np.newaxis] + params[cix]), axis=1) + + return val + +def robust_sumsin_fit(x: np.ndarray, y: np.ndarray, nb_frequency_max: int = 3, + bounds_amp_freq_phase: Optional[list[tuple[float,float], tuple[float,float], tuple[float,float]]] = None, + cost_func: Callable = soft_loss, subsample: Union[float,int] = 25000, hop_length : Optional[float] = None, + random_state: None | np.random.RandomState | np.random.Generator | int = None, verbose: bool = False) -> tuple[np.ndarray,int]: + """ + Given 1D vectors x and y, compute a robust sum of sinusoid fit to the data. The number of frequency is chosen + automatically by comparing residuals for multiple fit orders of a given estimator. + Any keyword argument will be passed down to scipy.optimize.basinhopping. + + :param x: input x data (N,) + :param y: input y data (N,) + :param nb_frequency_max: maximum number of phases + :param bounds_amp_freq_phase: bounds for amplitude, frequency and phase (L, 3, 2) and + with mean value used for initialization + :param hop_length: jump in function values to optimize basinhopping algorithm search (for best results, should be + comparable to the separation (in function value) between local minima) + :param cost_func: cost function taking as input two vectors y (true y), y' (predicted y) of same length + :param subsample: If <= 1, will be considered a fraction of valid pixels to extract. + If > 1 will be considered the number of pixels to extract. + :param random_state: random seed for testing purposes + :param verbose: if text should be printed + + :returns coefs, degree: polynomial coefficients and degree for the best-fit polynomial + """ + + def wrapper_costfun_sumofsin(p, x, y): + return _costfun_sumofsin(p, x, y, cost_func=cost_func) + + # First, remove NaNs + valid_data = np.logical_and(np.isfinite(y), np.isfinite(x)) + x = x[valid_data] + y = y[valid_data] + + # If no significant resolution is provided, assume that it is the mean difference between sampled X values + if hop_length is None: + x_sorted = np.sort(x) + hop_length = np.mean(np.diff(x_sorted)) + + # Use binned statistics for first guess + nb_bin = int((x.max() - x.min()) / (5 * hop_length)) + df = nd_binning(y, [x], ['var'], list_var_bins=nb_bin, statistics=[np.nanmedian]) + # Compute first guess for x and y + x_fg = pd.IntervalIndex(df['var']).mid.values + y_fg = df['nanmedian'] + valid_fg = np.logical_and(np.isfinite(x_fg),np.isfinite(y_fg)) + x_fg = x_fg[valid_fg] + y_fg = y_fg[valid_fg] + + # Loop on all frequencies + costs = np.empty(nb_frequency_max) + amp_freq_phase = np.zeros((nb_frequency_max, 3*nb_frequency_max)) * np.nan + + for nb_freq in np.arange(1, nb_frequency_max+1): + + b = bounds_amp_freq_phase + # If bounds are not provided, define as the largest possible bounds + if b is None: + lb_amp = 0 + ub_amp = (y_fg.max() - y_fg.min()) / 2 + # Define for phase + lb_phase = 0 + ub_phase = 2 * np.pi + # Define for the frequency, we need at least 5 points to see any kind of periodic signal + lb_frequency = 1 / (5 * (x.max() - x.min())) + ub_frequency = 1 / (5 * hop_length) + + b = [] + for i in range(nb_freq): + b += [(lb_amp,ub_amp),(lb_frequency,ub_frequency),(lb_phase,ub_phase)] + + # Format lower and upper bounds for scipy + lb = np.asarray(([b[i][0] for i in range(3*nb_freq)])) + ub = np.asarray(([b[i][1] for i in range(3*nb_freq)])) + # Insert in a scipy bounds object + scipy_bounds = scipy.optimize.Bounds(lb, ub) + # First guess for the mean parameters + p0 = np.divide(lb + ub, 2) + + # Initialize with the first guess + init_args = dict(args=(x_fg, y_fg), method="L-BFGS-B", + bounds=scipy_bounds, options={"ftol": 1E-6}) + init_results = scipy.optimize.basinhopping(wrapper_costfun_sumofsin, p0, disp=verbose, + T=hop_length, minimizer_kwargs=init_args, seed=random_state) + init_results = init_results.lowest_optimization_result + + # Subsample the final raster + subsamp = subsample_raster(x, subsample=subsample, return_indices=True, random_state=random_state) + x = x[subsamp] + y = y[subsamp] + + # Minimize the globalization with a larger number of points + minimizer_kwargs = dict(args=(x, y), + method="L-BFGS-B", + bounds=scipy_bounds, + options={"ftol": 1E-6}) + myresults = scipy.optimize.basinhopping(wrapper_costfun_sumofsin, init_results.x, disp=verbose, + T=5 * hop_length, niter_success=40, + minimizer_kwargs=minimizer_kwargs, seed=random_state) + myresults = myresults.lowest_optimization_result + # Write results for this number of frequency + costs[nb_freq-1] = wrapper_costfun_sumofsin(myresults.x,x,y) + amp_freq_phase[nb_freq -1, 0:3*nb_freq] = myresults.x + + final_index = _choice_best_order(cost=costs) + + final_coefs = amp_freq_phase[final_index][~np.isnan(amp_freq_phase[final_index])] + + # If an amplitude coefficient is almost zero, remove the coefs of that frequency and lower the degree + final_degree = final_index + 1 + for i in range(final_index+1): + if np.abs(final_coefs[3*i]) < (np.nanpercentile(x, 90) - np.nanpercentile(x, 10))/1000: + final_coefs = np.delete(final_coefs, slice(3*i,3*i+3)) + final_degree -= 1 + break + + # The number of frequencies corresponds to the final index plus one + return final_coefs, final_degree + diff --git a/xdem/spatial_tools.py b/xdem/spatial_tools.py index 869832ae..05d1f59a 100644 --- a/xdem/spatial_tools.py +++ b/xdem/spatial_tools.py @@ -1,20 +1,20 @@ """Basic operations to be run on 2D arrays and DEMs""" from __future__ import annotations -from typing import Callable, Union, Sized + +from typing import Callable, Union, Sized, Optional import warnings +import itertools import geoutils as gu from geoutils.georaster import RasterType import numpy as np import rasterio as rio import rasterio.warp from tqdm import tqdm -import numba import skimage.transform from xdem.misc import deprecate - def get_mask(array: Union[np.ndarray, np.ma.masked_array]) -> np.ndarray: """ Return the mask of invalid values, whether array is a ndarray with NaNs or a np.ma.masked_array. @@ -85,23 +85,6 @@ def get_valid_extent(array: Union[np.ndarray, np.ma.masked_array]) -> tuple: rows_nonzero = np.where(np.count_nonzero(valid_mask, axis=1) > 0)[0] return rows_nonzero[0], rows_nonzero[-1], cols_nonzero[0], cols_nonzero[-1] - -def nmad(data: np.ndarray, nfact: float = 1.4826) -> float: - """ - Calculate the normalized median absolute deviation (NMAD) of an array. - - :param data: input data - :param nfact: normalization factor for the data; default is 1.4826 - - :returns nmad: (normalized) median absolute deviation of data. - """ - if isinstance(data, np.ma.masked_array): - data_arr = get_array_and_mask(data, check_shape=False)[0] - else: - data_arr = np.asarray(data) - return nfact * np.nanmedian(np.abs(data_arr - np.nanmedian(data_arr))) - - def resampling_method_from_str(method_str: str) -> rio.warp.Resampling: """Get a rasterio resampling method from a string representation, e.g. "cubic_spline".""" # Try to match the string version of the resampling method with a rio Resampling enum name @@ -394,7 +377,6 @@ def _get_closest_rectangle(size: int) -> tuple[int, int]: raise NotImplementedError(f"Function criteria not met for rectangle of size: {size}") - def subdivide_array(shape: tuple[int, ...], count: int) -> np.ndarray: """ Create indices for subdivison of an array in a number of blocks. @@ -447,6 +429,34 @@ def subdivide_array(shape: tuple[int, ...], count: int) -> np.ndarray: return indices.reshape(shape) +def get_xy_rotated(raster: gu.georaster.Raster, along_track_angle: float) -> tuple[np.ndarray, np.ndarray]: + """ + Rotate x, y axes of image to get along- and cross-track distances. + :param raster: raster to get x,y positions from. + :param along_track_angle: angle by which to rotate axes (degrees) + :returns xxr, yyr: arrays corresponding to along (x) and cross (y) track distances. + """ + + myang = np.deg2rad(along_track_angle) + + # get grid coordinates + xx, yy = raster.coords(grid=True) + xx -= np.min(xx) + yy -= np.min(yy) + + # get rotated coordinates + + # for along-track + xxr = np.multiply(xx, np.cos(myang)) + np.multiply(-1 * yy, np.sin(along_track_angle)) + # for cross-track + yyr = np.multiply(xx, np.sin(myang)) + np.multiply(yy, np.cos(along_track_angle)) + + # re-initialize coordinate at zero + xxr -= np.nanmin(xxr) + yyr -= np.nanmin(yyr) + + return xxr, yyr + def subsample_raster( array: Union[np.ndarray, np.ma.masked_array], subsample: Union[float, int], return_indices: bool = False, @@ -494,4 +504,4 @@ def subsample_raster( return unraveled_indices else: - return array[unraveled_indices] + return array[unraveled_indices] \ No newline at end of file diff --git a/xdem/spatialstats.py b/xdem/spatialstats.py index 8fc801c8..24f2e4b7 100644 --- a/xdem/spatialstats.py +++ b/xdem/spatialstats.py @@ -21,7 +21,7 @@ from skimage.draw import disk from scipy.interpolate import RegularGridInterpolator, LinearNDInterpolator, griddata from scipy.stats import binned_statistic, binned_statistic_2d, binned_statistic_dd -from xdem.spatial_tools import nmad, subsample_raster, get_array_and_mask +from xdem.spatial_tools import subsample_raster, get_array_and_mask from geoutils.georaster import RasterType, Raster with warnings.catch_warnings(): @@ -29,6 +29,21 @@ import skgstat as skg from skgstat import models +def nmad(data: np.ndarray, nfact: float = 1.4826) -> float: + """ + Calculate the normalized median absolute deviation (NMAD) of an array. + + :param data: input data + :param nfact: normalization factor for the data; default is 1.4826 + + :returns nmad: (normalized) median absolute deviation of data. + """ + if isinstance(data, np.ma.masked_array): + data_arr = get_array_and_mask(data, check_shape=False)[0] + else: + data_arr = np.asarray(data) + return nfact * np.nanmedian(np.abs(data_arr - np.nanmedian(data_arr))) + def interp_nd_binning(df: pd.DataFrame, list_var_names: Union[str,list[str]], statistic : Union[str, Callable[[np.ndarray],float]] = nmad, min_count: Optional[int] = 100) -> Callable[[tuple[np.ndarray, ...]], np.ndarray]: """