diff --git a/skfda/datasets/__init__.py b/skfda/datasets/__init__.py index 666495cef..4455af577 100644 --- a/skfda/datasets/__init__.py +++ b/skfda/datasets/__init__.py @@ -17,6 +17,7 @@ "fetch_nox", "fetch_octane", "fetch_phoneme", + "fetch_physionet", "fetch_tecator", "fetch_ucr", "fetch_weather", @@ -45,6 +46,7 @@ fetch_nox as fetch_nox, fetch_octane as fetch_octane, fetch_phoneme as fetch_phoneme, + fetch_physionet as fetch_physionet, fetch_tecator as fetch_tecator, fetch_ucr as fetch_ucr, fetch_weather as fetch_weather, diff --git a/skfda/datasets/_real_datasets.py b/skfda/datasets/_real_datasets.py index 0ccc52ea7..061de6335 100644 --- a/skfda/datasets/_real_datasets.py +++ b/skfda/datasets/_real_datasets.py @@ -10,6 +10,7 @@ from typing_extensions import Literal import rdata +import skdatasets from ..representation import FDataGrid from ..typing._numpy import NDArrayFloat, NDArrayInt @@ -213,6 +214,126 @@ def fetch_ucr( return dataset +def _physionet_to_fdatagrid( + name: str, + data: DataFrame, + mode: Literal[ + None, + "pad_left", + "pad_right", + "truncate_left", + "truncate_right", + ], +) -> FDataGrid: + + column = data.loc[:, "signal"] + n_samples = len(column) + dim_codomain = column[0].shape[1] + + min_len = min(s.shape[0] for s in column) + max_len = max(s.shape[0] for s in column) + + if mode is None and min_len != max_len: + raise ValueError( + f"Dataset {name} has signals of different lengths. Use the " + f"'mode' parameter to set a common lenght", + ) + + n_points = max_len if mode in {"pad_left", "pad_right"} else min_len + + data_matrix = np.full( + shape=(n_samples, n_points, dim_codomain), + fill_value=np.nan, + dtype=column[0].dtype, + ) + + for i, sample in enumerate(column): + copy_len = min(sample.shape[0], n_points) + + if mode in {None, "pad_right", "truncate_right"}: + data_matrix[i, :copy_len, :] = sample[:copy_len, :] + else: + data_matrix[i, -copy_len:, :] = sample[-copy_len:, :] + + grid_points = np.linspace( + 0, + column.attrs["fs"] * (n_points - 1), + n_points, + ) + + coordinate_names = [ + f"{sig_name}({unit})" + for sig_name, unit in zip( + column.attrs["sig_name"], + column.attrs["units"], + ) + ] + + sample_names = list(data.index) + + return FDataGrid( + data_matrix=data_matrix, + grid_points=grid_points, + dataset_name=name, + coordinate_names=coordinate_names, + sample_names=sample_names, + ) + + +def fetch_physionet( + name: str, + *, + return_X_y: bool = False, + as_frame: bool = True, + target_column: str | Sequence[str] | None = None, + mode: Literal[ + None, + "pad_left", + "pad_right", + "truncate_left", + "truncate_right", + ] = None, + **kwargs: Any, +) -> ( + Bunch + | Tuple[NDArrayAny, NDArrayAny | None] + | Tuple[DataFrame, Series | DataFrame | None] +): + """ + Fetch a dataset from Physionet. + + Args: + name: Dataset name. + kwargs: Additional parameters for the function + :func:`skdatasets.repositories.ucr.fetch`. + + Returns: + The dataset requested. + + Examples: + >>> import skfda + >>> X, y = skfda.datasets.fetch_physionet("ctu-uhb-ctgdb", return_X_y=True, mode="truncate_right") + + """ + repositories = _get_skdatasets_repositories() + + dataset = repositories.physionet.fetch(name, as_frame=True, **kwargs) + + fdatagrid = _physionet_to_fdatagrid(name, data=dataset.frame, mode=mode) + + dataset.frame.loc[:, "signal"] = pd.Series( + fdatagrid, + index=dataset.frame.index, + ) + + return repositories.base.dataset_from_dataframe( + dataset.frame, + return_X_y=return_X_y, + as_frame=as_frame, + target_column=target_column, + ) + + def _fetch_cran_no_encoding_warning(*args: Any, **kwargs: Any) -> Any: # Probably non thread safe with warnings.catch_warnings(): diff --git a/skfda/preprocessing/__init__.py b/skfda/preprocessing/__init__.py index 914d2edbc..4be32f89d 100644 --- a/skfda/preprocessing/__init__.py +++ b/skfda/preprocessing/__init__.py @@ -6,6 +6,7 @@ __name__, submodules=[ "feature_construction", + "missing", "registration", "smoothing", "dim_reduction", diff --git a/skfda/preprocessing/missing/__init__.py b/skfda/preprocessing/missing/__init__.py new file mode 100644 index 000000000..dba04e3df --- /dev/null +++ b/skfda/preprocessing/missing/__init__.py @@ -0,0 +1 @@ +from ._interpolate import MissingValuesInterpolation diff --git a/skfda/preprocessing/missing/_interpolate.py b/skfda/preprocessing/missing/_interpolate.py new file mode 100644 index 000000000..b01538e7d --- /dev/null +++ b/skfda/preprocessing/missing/_interpolate.py @@ -0,0 +1,80 @@ +from typing import Any, TypeVar + +import numpy as np +from scipy.interpolate import InterpolatedUnivariateSpline +from scipy.interpolate.interpnd import LinearNDInterpolator + +from ..._utils._sklearn_adapter import BaseEstimator, InductiveTransformerMixin +from ...representation import FDataGrid +from ...typing._base import GridPoints +from ...typing._numpy import NDArrayFloat, NDArrayInt + +T = TypeVar("T", bound=FDataGrid) + + +def _coords_from_indices( + coord_indices: NDArrayInt, + grid_points: GridPoints, +) -> NDArrayFloat: + return np.stack([ + grid_points[i][coord_index] + for i, coord_index in enumerate(coord_indices.T) + ]).T + + +def _interpolate_nans( + fdatagrid: T, +) -> T: + + data_matrix = fdatagrid.data_matrix.copy() + + for n_sample in range(fdatagrid.n_samples): + for n_coord in range(fdatagrid.dim_codomain): + + data_points = data_matrix[n_sample, ..., n_coord] + nan_pos = np.isnan(data_points) + valid_pos = ~nan_pos + coord_indices = np.argwhere(valid_pos) + desired_coord_indices = np.argwhere(nan_pos) + coords = _coords_from_indices( + coord_indices, + fdatagrid.grid_points, + ) + desired_coords = _coords_from_indices( + desired_coord_indices, + fdatagrid.grid_points, + ) + values = data_points[valid_pos] + + if fdatagrid.dim_domain == 1: + interpolation = InterpolatedUnivariateSpline( + coords, + values, + k=1, + ext=3, + ) + else: + interpolation = LinearNDInterpolator( + coords, + values, + ) + + new_values = interpolation( + desired_coords, + ) + + data_matrix[n_sample, nan_pos, n_coord] = new_values.ravel() + + return fdatagrid.copy(data_matrix=data_matrix) + + +class MissingValuesInterpolation( + BaseEstimator, + InductiveTransformerMixin[T, T, Any], +): + + def transform( + self, + X: T, + ) -> T: + return _interpolate_nans(X)