diff --git a/docs/HISTORY.md b/docs/HISTORY.md index a64d469e..4800673b 100644 --- a/docs/HISTORY.md +++ b/docs/HISTORY.md @@ -14,6 +14,12 @@ and [Edoardo Balzani](https://www.simonsfoundation.org/people/edoardo-balzani/) of the Flatiron institute. +0.7.1 (2024-09-24) +------------------ + +- Fixing nan issue when computing 1d tuning curve (See issue #334). +- Refactor tuning curves and correlogram tests. +- Adding validators decorators for tuning curves and correlogram modules. 0.7.0 (2024-09-16) ------------------ diff --git a/docs/index.md b/docs/index.md index d517eeb1..0d3a7739 100644 --- a/docs/index.md +++ b/docs/index.md @@ -12,9 +12,7 @@ PYthon Neural Analysis Package. pynapple is a light-weight python library for neurophysiological data analysis. The goal is to offer a versatile set of tools to study typical data in the field, i.e. time series (spike times, behavioral events, etc.) and time intervals (trials, brain states, etc.). It also provides users with generic functions for neuroscience such as tuning curves and cross-correlograms. - Free software: MIT License -- __Documentation__: -- __Notebooks and tutorials__ : - +- __Documentation__: > **Note** diff --git a/pynapple/__init__.py b/pynapple/__init__.py index f55f6194..8b45ee8d 100644 --- a/pynapple/__init__.py +++ b/pynapple/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.7.0" +__version__ = "0.7.1" from .core import ( IntervalSet, Ts, diff --git a/pynapple/core/time_series.py b/pynapple/core/time_series.py index 2af7f269..be3c1bea 100644 --- a/pynapple/core/time_series.py +++ b/pynapple/core/time_series.py @@ -139,7 +139,7 @@ def size(self): return self.values.size def __array__(self, dtype=None): - return self.values.astype(dtype) + return np.asarray(self.values, dtype=dtype) def __array_ufunc__(self, ufunc, method, *args, **kwargs): # print("In __array_ufunc__") diff --git a/pynapple/process/correlograms.py b/pynapple/process/correlograms.py index 41140c18..73f34b0e 100644 --- a/pynapple/process/correlograms.py +++ b/pynapple/process/correlograms.py @@ -1,6 +1,19 @@ -"""Cross-correlograms """ +""" +This module holds the functions to compute discrete cross-correlogram +for timestamps data (i.e. spike times). +| Function | Description | +|------|------| +| `nap.compute_autocorrelogram` | Autocorrelograms from a TsGroup object | +| `nap.compute_crosscorrelogram` | Crosscorrelogram from a TsGroup object | +| `nap.compute_eventcorrelogram` | Crosscorrelogram between a TsGroup object and a Ts object | + +""" + +import inspect +from functools import wraps from itertools import combinations, product +from numbers import Number import numpy as np import pandas as pd @@ -9,9 +22,53 @@ from .. import core as nap -######################################################### -# CORRELATION -######################################################### +def _validate_correlograms_inputs(func): + @wraps(func) + def wrapper(*args, **kwargs): + # Validate each positional argument + sig = inspect.signature(func) + kwargs = sig.bind_partial(*args, **kwargs).arguments + + # Only TypeError here + if getattr(func, "__name__") == "compute_crosscorrelogram" and isinstance( + kwargs["group"], (tuple, list) + ): + if ( + not all([isinstance(g, nap.TsGroup) for g in kwargs["group"]]) + or len(kwargs["group"]) != 2 + ): + raise TypeError( + "Invalid type. Parameter group must be of type TsGroup or a tuple/list of (TsGroup, TsGroup)." + ) + else: + if not isinstance(kwargs["group"], nap.TsGroup): + msg = "Invalid type. Parameter group must be of type TsGroup" + if getattr(func, "__name__") == "compute_crosscorrelogram": + msg = msg + " or a tuple/list of (TsGroup, TsGroup)." + raise TypeError(msg) + + parameters_type = { + "binsize": Number, + "windowsize": Number, + "ep": nap.IntervalSet, + "norm": bool, + "time_units": str, + "reverse": bool, + "event": (nap.Ts, nap.Tsd), + } + for param, param_type in parameters_type.items(): + if param in kwargs: + if not isinstance(kwargs[param], param_type): + raise TypeError( + f"Invalid type. Parameter {param} must be of type {param_type}." + ) + + # Call the original function with validated inputs + return func(**kwargs) + + return wrapper + + @jit(nopython=True) def _cross_correlogram(t1, t2, binsize, windowsize): """ @@ -81,6 +138,7 @@ def _cross_correlogram(t1, t2, binsize, windowsize): return C, B +@_validate_correlograms_inputs def compute_autocorrelogram( group, binsize, windowsize, ep=None, norm=True, time_units="s" ): @@ -118,13 +176,10 @@ def compute_autocorrelogram( RuntimeError group must be TsGroup """ - if type(group) is nap.TsGroup: - if isinstance(ep, nap.IntervalSet): - newgroup = group.restrict(ep) - else: - newgroup = group + if isinstance(ep, nap.IntervalSet): + newgroup = group.restrict(ep) else: - raise RuntimeError("Unknown format for group") + newgroup = group autocorrs = {} @@ -152,6 +207,7 @@ def compute_autocorrelogram( return autocorrs.astype("float") +@_validate_correlograms_inputs def compute_crosscorrelogram( group, binsize, windowsize, ep=None, norm=True, time_units="s", reverse=False ): @@ -207,7 +263,24 @@ def compute_crosscorrelogram( np.array([windowsize], dtype=np.float64), time_units )[0] - if isinstance(group, nap.TsGroup): + if isinstance(group, tuple): + if isinstance(ep, nap.IntervalSet): + newgroup = [group[i].restrict(ep) for i in range(2)] + else: + newgroup = group + + pairs = product(list(newgroup[0].keys()), list(newgroup[1].keys())) + + for i, j in pairs: + spk1 = newgroup[0][i].index + spk2 = newgroup[1][j].index + auc, times = _cross_correlogram(spk1, spk2, binsize, windowsize) + if norm: + auc /= newgroup[1][j].rate + crosscorrs[(i, j)] = pd.Series(index=times, data=auc, dtype="float") + + crosscorrs = pd.DataFrame.from_dict(crosscorrs) + else: if isinstance(ep, nap.IntervalSet): newgroup = group.restrict(ep) else: @@ -232,34 +305,10 @@ def compute_crosscorrelogram( ) crosscorrs = crosscorrs / freq2 - elif ( - isinstance(group, (tuple, list)) - and len(group) == 2 - and all(map(lambda g: isinstance(g, nap.TsGroup), group)) - ): - if isinstance(ep, nap.IntervalSet): - newgroup = [group[i].restrict(ep) for i in range(2)] - else: - newgroup = group - - pairs = product(list(newgroup[0].keys()), list(newgroup[1].keys())) - - for i, j in pairs: - spk1 = newgroup[0][i].index - spk2 = newgroup[1][j].index - auc, times = _cross_correlogram(spk1, spk2, binsize, windowsize) - if norm: - auc /= newgroup[1][j].rate - crosscorrs[(i, j)] = pd.Series(index=times, data=auc, dtype="float") - - crosscorrs = pd.DataFrame.from_dict(crosscorrs) - - else: - raise RuntimeError("Unknown format for group") - return crosscorrs.astype("float") +@_validate_correlograms_inputs def compute_eventcorrelogram( group, event, binsize, windowsize, ep=None, norm=True, time_units="s" ): @@ -306,10 +355,7 @@ def compute_eventcorrelogram( else: tsd1 = event.restrict(ep).index - if type(group) is nap.TsGroup: - newgroup = group.restrict(ep) - else: - raise RuntimeError("Unknown format for group") + newgroup = group.restrict(ep) crosscorrs = {} diff --git a/pynapple/process/spectrum.py b/pynapple/process/spectrum.py index 30668c06..085ff8fa 100644 --- a/pynapple/process/spectrum.py +++ b/pynapple/process/spectrum.py @@ -1,19 +1,57 @@ """ -# Power spectral density - This module contains functions to compute power spectral density and mean power spectral density. +| Function | Description | +|------|------| +| `compute_power_spectral_density` | Compute Power Spectral Density over a single epoch | +| `compute_mean_power_spectral_density` | Compute Mean Power Spectral Density over multiple epochs | + """ +import inspect +from functools import wraps from numbers import Number import numpy as np import pandas as pd +from numba import njit from scipy import signal from .. import core as nap +def _validate_spectrum_inputs(func): + @wraps(func) + def wrapper(*args, **kwargs): + # Validate each positional argument + sig = inspect.signature(func) + kwargs = sig.bind_partial(*args, **kwargs).arguments + + parameters_type = { + "sig": (nap.Tsd, nap.TsdFrame), + "fs": Number, + "ep": nap.IntervalSet, + "full_range": bool, + "norm": bool, + "n": int, + "time_unit": str, + "interval_size": Number, + "overlap": float, + } + for param, param_type in parameters_type.items(): + if param in kwargs: + if not isinstance(kwargs[param], param_type): + raise TypeError( + f"Invalid type. Parameter {param} must be of type {param_type}." + ) + + # Call the original function with validated inputs + return func(**kwargs) + + return wrapper + + +@_validate_spectrum_inputs def compute_power_spectral_density( sig, fs=None, ep=None, full_range=False, norm=False, n=None ): @@ -45,25 +83,15 @@ def compute_power_spectral_density( Notes ----- - compute_spectogram computes fft on only a single epoch of data. This epoch be given with the ep + This function computes fft on only a single epoch of data. This epoch be given with the ep parameter otherwise will be sig.time_support, but it must only be a single epoch. """ - if not isinstance(sig, (nap.Tsd, nap.TsdFrame)): - raise TypeError("sig must be either a Tsd or a TsdFrame object.") - if not (fs is None or isinstance(fs, Number)): - raise TypeError("fs must be of type float or int") - if not (ep is None or isinstance(ep, nap.IntervalSet)): - raise TypeError("ep param must be a pynapple IntervalSet object, or None") if ep is None: ep = sig.time_support if len(ep) != 1: raise ValueError("Given epoch (or signal time_support) must have length 1") if fs is None: fs = sig.rate - if not isinstance(full_range, bool): - raise TypeError("full_range must be of type bool or None") - if not isinstance(norm, bool): - raise TypeError("norm must be of type bool") fft_result = np.fft.fft(sig.restrict(ep).values, n=n, axis=0) if n is None: @@ -81,10 +109,12 @@ def compute_power_spectral_density( return ret +@_validate_spectrum_inputs def compute_mean_power_spectral_density( sig, interval_size, fs=None, + overlap=0.25, ep=None, full_range=False, norm=False, @@ -95,7 +125,7 @@ def compute_mean_power_spectral_density( The parameter `interval_size` controls the duration of the epochs. - To imporve frequency resolution, the signal is multiplied by a Hamming window. + To improve frequency resolution, the signal is multiplied by a Hamming window. Note that this function assumes a constant sampling rate for `sig`. @@ -105,8 +135,11 @@ def compute_mean_power_spectral_density( Signal with equispaced samples interval_size : Number Epochs size to compute to average the FFT across - fs : None, optional + fs : Number, optional Sampling frequency of `sig`. If `None`, `fs` is equal to `sig.rate` + overlap : float, optional + Percentage of overlap between successive intervals. + `0.0 <= overlap < 1.0`. Default is 0.25 ep : None or pynapple.IntervalSet, optional The `IntervalSet` to calculate the fft on. Can be any length. full_range : bool, optional @@ -133,39 +166,30 @@ def compute_mean_power_spectral_density( ------ RuntimeError If splitting the epoch with `interval_size` results in an empty set. - TypeError - If `ep` or `sig` are not respectively pynapple time series or interval set. + ValueError + If overlap is not within [0, 1). """ - if not isinstance(sig, (nap.Tsd, nap.TsdFrame)): - raise TypeError("sig must be either a Tsd or a TsdFrame object.") + if not (0.0 <= overlap < 1.0): + raise ValueError("Overlap should be in intervals [0.0, 1.0).") - if not (ep is None or isinstance(ep, nap.IntervalSet)): - raise TypeError("ep param must be a pynapple IntervalSet object, or None") if ep is None: ep = sig.time_support - if not (fs is None or isinstance(fs, Number)): - raise TypeError("fs must be of type float or int") if fs is None: fs = sig.rate - if not isinstance(full_range, bool): - raise TypeError("full_range must be of type bool or None") - - if not isinstance(norm, bool): - raise TypeError("norm must be of type bool") - - # Split the ep interval_size = nap.TsIndex.format_timestamps(np.array([interval_size]), time_unit)[ 0 ] - split_ep = ep.split(interval_size) - if len(split_ep) == 0: + # Check if at least one epoch is larger than the interval size + if np.max(ep.end - ep.start) < interval_size: raise RuntimeError( f"Splitting epochs with interval_size={interval_size} generated an empty IntervalSet. Try decreasing interval_size" ) + split_ep = _overlap_split(ep.start, ep.end, interval_size, overlap) + # Get the slices of each ep slices = np.zeros((len(split_ep), 2), dtype=int) @@ -205,3 +229,24 @@ def compute_mean_power_spectral_density( if not full_range: return ret.loc[ret.index >= 0] return ret + + +@njit +def _overlap_split(start, end, interval_size, overlap): + N = int( + np.ceil(np.sum(end - start) / (interval_size * (1 - overlap))) + ) # upper bound + slices = np.zeros((N + 1, 2)) + + k = 0 # epochs + n = 0 + while k < len(start): + t = start[k] + while t + interval_size < end[k]: + slices[n, 0] = t + slices[n, 1] = t + interval_size + t += (1 - overlap) * interval_size + n += 1 + k += 1 + + return slices[0:n] diff --git a/pynapple/process/tuning_curves.py b/pynapple/process/tuning_curves.py index 21132387..254e6e24 100644 --- a/pynapple/process/tuning_curves.py +++ b/pynapple/process/tuning_curves.py @@ -1,12 +1,23 @@ # -*- coding: utf-8 -*- """ +You can compute tuning curves for features in 1 dimension or 2 dimension. + +| Function | Description | +|------|------| +| `nap.compute_discrete_tuning_curves` | Firing rate from a dictionnary of IntervalSet| +| `compute_1d_tuning_curves` | Firing rate as a function of a 1-d feature | +| `compute_2d_tuning_curves` | Firing rate as a function of a 2-d features | +| `compute_1d_tuning_curves_continuous` | Mean value as a function of a 1-d feature | +| `compute_2d_tuning_curves_continuous` | Mean value as a function of a 2-d features | +| `compute_1d_mutual_info` | Mutual information of a tuning curve computed from a 1-d feature. | +| `compute_2d_mutual_info` | Mutual information of a tuning curve computed from a 2-d features. | + """ -# @Author: gviejo -# @Date: 2022-01-02 23:33:42 -# @Last Modified by: Guillaume Viejo -# @Last Modified time: 2024-01-29 11:10:07 +import inspect import warnings +from collections.abc import Iterable +from functools import wraps import numpy as np import pandas as pd @@ -14,6 +25,74 @@ from .. import core as nap +def _validate_tuning_inputs(func): + @wraps(func) + def wrapper(*args, **kwargs): + # Validate each positional argument + sig = inspect.signature(func) + kwargs = sig.bind_partial(*args, **kwargs).arguments + + if "feature" in kwargs: + if not isinstance(kwargs["feature"], (nap.Tsd, nap.TsdFrame)): + raise TypeError( + "feature should be a Tsd (or TsdFrame with 1 column only)" + ) + if ( + isinstance(kwargs["feature"], nap.TsdFrame) + and not kwargs["feature"].shape[1] == 1 + ): + raise ValueError( + "feature should be a Tsd (or TsdFrame with 1 column only)" + ) + if "features" in kwargs: + if not isinstance(kwargs["features"], nap.TsdFrame): + raise TypeError("features should be a TsdFrame with 2 columns") + if not kwargs["features"].shape[1] == 2: + raise ValueError("features should have 2 columns only.") + if "nb_bins" in kwargs: + if not isinstance(kwargs["nb_bins"], (int, tuple)): + raise TypeError( + "nb_bins should be of type int (or tuple with (int, int) for 2D tuning curves)." + ) + if "group" in kwargs: + if not isinstance(kwargs["group"], nap.TsGroup): + raise TypeError("group should be a TsGroup.") + if "ep" in kwargs: + if not isinstance(kwargs["ep"], nap.IntervalSet): + raise TypeError("ep should be an IntervalSet") + if "minmax" in kwargs: + if not isinstance(kwargs["minmax"], Iterable): + raise TypeError("minmax should be a tuple/list of 2 numbers") + if "dict_ep" in kwargs: + if not isinstance(kwargs["dict_ep"], dict): + raise TypeError("dict_ep should be a dictionary of IntervalSet") + if not all( + [isinstance(v, nap.IntervalSet) for v in kwargs["dict_ep"].values()] + ): + raise TypeError("dict_ep argument should contain only IntervalSet.") + if "tc" in kwargs: + if not isinstance(kwargs["tc"], (pd.DataFrame, np.ndarray)): + raise TypeError( + "Argument tc should be of type pandas.DataFrame or numpy.ndarray" + ) + if "dict_tc" in kwargs: + if not isinstance(kwargs["dict_tc"], (dict, np.ndarray)): + raise TypeError( + "Argument dict_tc should be a dictionary of numpy.ndarray or numpy.ndarray." + ) + if "bitssec" in kwargs: + if not isinstance(kwargs["bitssec"], bool): + raise TypeError("Argument bitssec should be of type bool") + if "tsdframe" in kwargs: + if not isinstance(kwargs["tsdframe"], (nap.Tsd, nap.TsdFrame)): + raise TypeError("Argument tsdframe should be of type Tsd or TsdFrame.") + # Call the original function with validated inputs + return func(**kwargs) + + return wrapper + + +@_validate_tuning_inputs def compute_discrete_tuning_curves(group, dict_ep): """ Compute discrete tuning curves of a TsGroup using a dictionary of epochs. @@ -52,16 +131,7 @@ def compute_discrete_tuning_curves(group, dict_ep): RuntimeError If group is not a TsGroup object. """ - assert isinstance(group, nap.TsGroup), "group should be a TsGroup." - assert isinstance(dict_ep, dict), "dict_ep should be a dictionary of IntervalSet" idx = np.sort(list(dict_ep.keys())) - for k in idx: - assert isinstance( - dict_ep[k], nap.IntervalSet - ), "dict_ep argument should contain only IntervalSet. Key {} in dict_ep is not an IntervalSet".format( - k - ) - tuning_curves = pd.DataFrame(index=idx, columns=list(group.keys()), data=0.0) for k in dict_ep.keys(): @@ -73,6 +143,7 @@ def compute_discrete_tuning_curves(group, dict_ep): return tuning_curves +@_validate_tuning_inputs def compute_1d_tuning_curves(group, feature, nb_bins, ep=None, minmax=None): """ Computes 1-dimensional tuning curves relative to a 1d feature. @@ -103,48 +174,34 @@ def compute_1d_tuning_curves(group, feature, nb_bins, ep=None, minmax=None): If group is not a TsGroup object. """ - assert isinstance(group, nap.TsGroup), "group should be a TsGroup." - assert isinstance( - feature, (nap.Tsd, nap.TsdFrame) - ), "feature should be a Tsd (or TsdFrame with 1 column only)" - if isinstance(feature, nap.TsdFrame): - assert ( - feature.shape[1] == 1 - ), "feature should be a Tsd (or TsdFrame with 1 column only)" - assert isinstance(nb_bins, int) - + if minmax is not None and len(minmax) != 2: + raise ValueError("minmax should be of length 2.") if ep is None: ep = feature.time_support - else: - assert isinstance(ep, nap.IntervalSet), "ep should be an IntervalSet" if minmax is None: - bins = np.linspace(np.min(feature), np.max(feature), nb_bins + 1) + bins = np.linspace(np.nanmin(feature), np.nanmax(feature), nb_bins + 1) else: - assert isinstance(minmax, tuple), "minmax should be a tuple of boundaries" bins = np.linspace(minmax[0], minmax[1], nb_bins + 1) idx = bins[0:-1] + np.diff(bins) / 2 tuning_curves = pd.DataFrame(index=idx, columns=list(group.keys())) - if isinstance(ep, nap.IntervalSet): - group_value = group.value_from(feature, ep) - occupancy, _ = np.histogram(feature.restrict(ep).values, bins) - else: - group_value = group.value_from(feature) - occupancy, _ = np.histogram(feature.values, bins) + group_value = group.value_from(feature, ep) + + occupancy, _ = np.histogram(feature.restrict(ep).values, bins) for k in group_value: count, _ = np.histogram(group_value[k].values, bins) count = count / occupancy - count[np.isnan(count)] = 0.0 tuning_curves[k] = count tuning_curves[k] = count * feature.rate return tuning_curves +@_validate_tuning_inputs def compute_2d_tuning_curves(group, features, nb_bins, ep=None, minmax=None): """ Computes 2-dimensional tuning curves relative to a 2d features @@ -155,15 +212,15 @@ def compute_2d_tuning_curves(group, features, nb_bins, ep=None, minmax=None): The group of Ts/Tsd for which the tuning curves will be computed features : TsdFrame The 2d features (i.e. 2 columns features). - nb_bins : int - Number of bins in the tuning curves + nb_bins : int or tuple + Number of bins in the tuning curves (separate for 2 feature dimensions if tuple provided) ep : IntervalSet, optional The epoch on which tuning curves are computed. If None, the epoch is the time support of the feature. minmax : tuple or list, optional The min and max boundaries of the tuning curves given as: (minx, maxx, miny, maxy) - If None, the boundaries are inferred from the target variable + If None, the boundaries are inferred from the target features Returns ------- @@ -178,57 +235,57 @@ def compute_2d_tuning_curves(group, features, nb_bins, ep=None, minmax=None): If group is not a TsGroup object or if features is not 2 columns only. """ - assert isinstance(group, nap.TsGroup), "group should be a TsGroup." - assert isinstance( - features, nap.TsdFrame - ), "features should be a TsdFrame with 2 columns" - if isinstance(features, nap.TsdFrame): - assert features.shape[1] == 2, "features should have 2 columns only." - assert isinstance(nb_bins, int) + if minmax is not None and len(minmax) != 4: + raise ValueError("minmax should be of length 4.") + + if isinstance(nb_bins, tuple) and len(nb_bins) != 2: + raise ValueError( + "nb_bins should be of type int (or tuple with (int, int) for 2D tuning curves)." + ) + + if isinstance(nb_bins, int): + nb_bins = (nb_bins, nb_bins) if ep is None: ep = features.time_support else: - assert isinstance(ep, nap.IntervalSet), "ep should be an IntervalSet" features = features.restrict(ep) - cols = list(features.columns) groups_value = {} binsxy = {} - for i, c in enumerate(cols): - groups_value[c] = group.value_from(features.loc[c], ep) + for i in range(2): + groups_value[i] = group.value_from(features[:, i], ep) if minmax is None: bins = np.linspace( - np.min(features.loc[c]), np.max(features.loc[c]), nb_bins + 1 + np.nanmin(features[:, i]), np.nanmax(features[:, i]), nb_bins[i] + 1 ) else: - assert isinstance(minmax, tuple), "minmax should be a tuple of 4 elements" - bins = np.linspace(minmax[i + i % 2], minmax[i + 1 + i % 2], nb_bins + 1) - binsxy[c] = bins + bins = np.linspace(minmax[i + i % 2], minmax[i + 1 + i % 2], nb_bins[i] + 1) + binsxy[i] = bins occupancy, _, _ = np.histogram2d( - features.loc[cols[0]].values.flatten(), - features.loc[cols[1]].values.flatten(), - [binsxy[cols[0]], binsxy[cols[1]]], + features[:, 0].values.flatten(), + features[:, 1].values.flatten(), + [binsxy[0], binsxy[1]], ) tc = {} for n in group.keys(): count, _, _ = np.histogram2d( - groups_value[cols[0]][n].values.flatten(), - groups_value[cols[1]][n].values.flatten(), - [binsxy[cols[0]], binsxy[cols[1]]], + groups_value[0][n].values.flatten(), + groups_value[1][n].values.flatten(), + [binsxy[0], binsxy[1]], ) count = count / occupancy - # count[np.isnan(count)] = 0.0 tc[n] = count * features.rate - xy = [binsxy[c][0:-1] + np.diff(binsxy[c]) / 2 for c in binsxy.keys()] + xy = [binsxy[i][0:-1] + np.diff(binsxy[i]) / 2 for i in range(2)] return tc, xy +@_validate_tuning_inputs def compute_1d_mutual_info(tc, feature, ep=None, minmax=None, bitssec=False): """ Mutual information as defined in @@ -261,21 +318,13 @@ def compute_1d_mutual_info(tc, feature, ep=None, minmax=None, bitssec=False): if isinstance(tc, pd.DataFrame): columns = tc.columns.values fx = np.atleast_2d(tc.values) - elif isinstance(tc, np.ndarray): + else: fx = np.atleast_2d(tc) columns = np.arange(tc.shape[1]) - assert isinstance( - feature, (nap.Tsd, nap.TsdFrame) - ), "feature should be a Tsd (or TsdFrame with 1 column only)" - if isinstance(feature, nap.TsdFrame): - assert ( - feature.shape[1] == 1 - ), "feature should be a Tsd (or TsdFrame with 1 column only)" - nb_bins = tc.shape[0] + 1 if minmax is None: - bins = np.linspace(np.min(feature), np.max(feature), nb_bins) + bins = np.linspace(np.nanmin(feature), np.nanmax(feature), nb_bins) else: bins = np.linspace(minmax[0], minmax[1], nb_bins) @@ -303,7 +352,8 @@ def compute_1d_mutual_info(tc, feature, ep=None, minmax=None, bitssec=False): return SI -def compute_2d_mutual_info(tc, features, ep=None, minmax=None, bitssec=False): +@_validate_tuning_inputs +def compute_2d_mutual_info(dict_tc, features, ep=None, minmax=None, bitssec=False): """ Mutual information as defined in @@ -313,7 +363,7 @@ def compute_2d_mutual_info(tc, features, ep=None, minmax=None, bitssec=False): Parameters ---------- - tc : dict or numpy.ndarray + dict_tc : dict of numpy.ndarray or numpy.ndarray If array, first dimension should be the neuron features : TsdFrame The 2 columns features that were used to compute the tuning curves @@ -333,29 +383,21 @@ def compute_2d_mutual_info(tc, features, ep=None, minmax=None, bitssec=False): Spatial Information (default is bits/spikes) """ # A bit tedious here - if type(tc) is dict: - fx = np.array([tc[i] for i in tc.keys()]) - idx = list(tc.keys()) - elif type(tc) is np.ndarray: - fx = tc - idx = np.arange(len(tc)) - - assert isinstance( - features, nap.TsdFrame - ), "features should be a TsdFrame with 2 columns" - if isinstance(features, nap.TsdFrame): - assert features.shape[1] == 2, "features should have 2 columns only." + if type(dict_tc) is dict: + fx = np.array([dict_tc[i] for i in dict_tc.keys()]) + idx = list(dict_tc.keys()) + else: + fx = dict_tc + idx = np.arange(len(dict_tc)) nb_bins = (fx.shape[1] + 1, fx.shape[2] + 1) - cols = features.columns - bins = [] - for i, c in enumerate(cols): + for i in range(2): if minmax is None: bins.append( np.linspace( - np.min(features.loc[c]), np.max(features.loc[c]), nb_bins[i] + np.nanmin(features[:, i]), np.nanmax(features[:, i]), nb_bins[i] ) ) else: @@ -367,8 +409,8 @@ def compute_2d_mutual_info(tc, features, ep=None, minmax=None, bitssec=False): features = features.restrict(ep) occupancy, _, _ = np.histogram2d( - features.loc[cols[0]].values.flatten(), - features.loc[cols[1]].values.flatten(), + features[:, 0].values.flatten(), + features[:, 1].values.flatten(), [bins[0], bins[1]], ) occupancy = occupancy / occupancy.sum() @@ -391,16 +433,17 @@ def compute_2d_mutual_info(tc, features, ep=None, minmax=None, bitssec=False): return SI +@_validate_tuning_inputs def compute_1d_tuning_curves_continuous( tsdframe, feature, nb_bins, ep=None, minmax=None ): """ - Computes 1-dimensional tuning curves relative to a feature with continous data. + Computes 1-dimensional tuning curves relative to a feature with continuous data. Parameters ---------- tsdframe : Tsd or TsdFrame - Input data (e.g. continus calcium data + Input data (e.g. continuous calcium data where each column is the calcium activity of one neuron) feature : Tsd (or TsdFrame with 1 column only) The 1-dimensional target feature (e.g. head-direction) @@ -415,8 +458,7 @@ def compute_1d_tuning_curves_continuous( Returns ------- - pandas.DataFrame - DataFrame to hold the tuning curves + pandas.DataFrame to hold the tuning curves Raises ------ @@ -424,19 +466,13 @@ def compute_1d_tuning_curves_continuous( If tsdframe is not a Tsd or a TsdFrame object. """ - if not isinstance(tsdframe, (nap.Tsd, nap.TsdFrame)): - raise RuntimeError("Unknown format for tsdframe.") - elif isinstance(tsdframe, nap.Tsd): + if minmax is not None and len(minmax) != 2: + raise ValueError("minmax should be of length 2.") + + if isinstance(tsdframe, nap.Tsd): tsdframe = tsdframe[:, np.newaxis] - assert isinstance( - feature, (nap.Tsd, nap.TsdFrame) - ), "feature should be a Tsd (or TsdFrame with 1 column only)" - if isinstance(feature, nap.TsdFrame): - assert ( - feature.shape[1] == 1 - ), "feature should be a Tsd (or TsdFrame with 1 column only)" - feature = np.squeeze(feature) + feature = np.squeeze(feature) if isinstance(ep, nap.IntervalSet): feature = feature.restrict(ep) @@ -445,26 +481,34 @@ def compute_1d_tuning_curves_continuous( tsdframe = tsdframe.restrict(feature.time_support) if minmax is None: - bins = np.linspace(np.min(feature), np.max(feature), nb_bins + 1) + bins = np.linspace(np.nanmin(feature), np.nanmax(feature), nb_bins + 1) else: bins = np.linspace(minmax[0], minmax[1], nb_bins + 1) align_times = tsdframe.value_from(feature) idx = np.digitize(align_times.values, bins) - 1 - tmp = tsdframe.as_dataframe().groupby(idx).mean() - tmp = tmp.reindex(np.arange(0, len(bins) - 1)) - tmp.index = pd.Index(bins[0:-1] + np.diff(bins) / 2) - tmp = tmp.fillna(0) + tc = np.zeros((len(bins) - 1, tsdframe.shape[1])) + for i in range(0, nb_bins): + tc[i] = np.mean(tsdframe.values[idx == i], axis=0) + tc[np.isnan(tc)] = 0.0 - return pd.DataFrame(tmp) + # Assigning nans if bin is not visited. + occupancy, _ = np.histogram(feature, bins) + tc[occupancy == 0.0] = np.nan + + tc = pd.DataFrame( + index=bins[0:-1] + np.diff(bins) / 2, data=tc, columns=tsdframe.columns + ) + return tc +@_validate_tuning_inputs def compute_2d_tuning_curves_continuous( tsdframe, features, nb_bins, ep=None, minmax=None ): """ - Computes 2-dimensional tuning curves relative to a 2d feature with continous data. + Computes 2-dimensional tuning curves relative to a 2d feature with continuous data. Parameters ---------- @@ -496,16 +540,16 @@ def compute_2d_tuning_curves_continuous( If tsdframe is not a Tsd/TsdFrame or if features is not 2 columns """ - if not isinstance(tsdframe, (nap.Tsd, nap.TsdFrame)): - raise RuntimeError("Unknown format for tsdframe.") - elif isinstance(tsdframe, nap.Tsd): - tsdframe = tsdframe[:, np.newaxis] + if minmax is not None and len(minmax) != 4: + raise ValueError("minmax should be of length 4.") - assert isinstance( - features, nap.TsdFrame - ), "features should be a TsdFrame with 2 columns" - if isinstance(features, nap.TsdFrame): - assert features.shape[1] == 2, "features should have 2 columns only." + if isinstance(nb_bins, tuple) and len(nb_bins) != 2: + raise ValueError( + "nb_bins should be of type int (or tuple with (int, int) for 2D tuning curves)." + ) + + if isinstance(tsdframe, nap.Tsd): + tsdframe = tsdframe[:, np.newaxis] if isinstance(ep, nap.IntervalSet): features = features.restrict(ep) @@ -515,38 +559,46 @@ def compute_2d_tuning_curves_continuous( if isinstance(nb_bins, int): nb_bins = (nb_bins, nb_bins) - elif len(nb_bins) != 2: - raise RuntimeError("nb_bins should be int or tuple of 2 ints") - cols = list(features.columns) + binsxy = [] + idxs = [] - binsxy = {} - idxs = {} - - for i, c in enumerate(cols): + for i in range(2): if minmax is None: bins = np.linspace( - np.min(features.loc[c]), np.max(features.loc[c]), nb_bins[i] + 1 + np.nanmin(features[:, i]), np.nanmax(features[:, i]), nb_bins[i] + 1 ) else: bins = np.linspace(minmax[i + i % 2], minmax[i + 1 + i % 2], nb_bins[i] + 1) - align_times = tsdframe.value_from(features.loc[c], ep) - idxs[c] = np.digitize(align_times.values.flatten(), bins) - 1 - binsxy[c] = bins + align_times = tsdframe.value_from(features[:, i], ep) + idxs.append(np.digitize(align_times.values.flatten(), bins) - 1) + binsxy.append(bins) + + idxs = np.transpose(np.array(idxs)) - idxs = pd.DataFrame(idxs) + tc = np.zeros((tsdframe.shape[1], nb_bins[0], nb_bins[1])) - tc_np = np.zeros((tsdframe.shape[1], nb_bins[0], nb_bins[1])) * np.nan + for i in range(nb_bins[0]): + for j in range(nb_bins[1]): + tc[:, i, j] = np.mean( + tsdframe.values[np.logical_and(idxs[:, 0] == i, idxs[:, 1] == j)], 0 + ) - for k, tmp in idxs.groupby(cols): - if (0 <= k[0] < nb_bins[0]) and (0 <= k[1] < nb_bins[1]): - tc_np[:, k[0], k[1]] = np.mean(tsdframe[tmp.index.values].values, 0) + tc[np.isnan(tc)] = 0.0 - tc_np[np.isnan(tc_np)] = 0.0 + # Assigning nans if bin is not visited. + occupancy, _, _ = np.histogram2d( + features[:, 0].values.flatten(), + features[:, 1].values.flatten(), + [binsxy[0], binsxy[1]], + ) + occupancy = occupancy[np.newaxis, :, :] + occupancy = np.repeat(occupancy, len(tc), axis=0) + tc[occupancy == 0.0] = np.nan - xy = [binsxy[c][0:-1] + np.diff(binsxy[c]) / 2 for c in binsxy.keys()] + xy = [binsxy[i][0:-1] + np.diff(binsxy[i]) / 2 for i in range(2)] - tc = {c: tc_np[i] for i, c in enumerate(tsdframe.columns)} + tc = {c: tc[i] for i, c in enumerate(tsdframe.columns)} return tc, xy diff --git a/pyproject.toml b/pyproject.toml index 73afed77..8ec26e35 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "pynapple" -version = "0.7.0" +version = "0.7.1" description = "PYthon Neural Analysis Package Pour Laboratoires d’Excellence" readme = "README.md" authors = [{ name = "Guillaume Viejo", email = "guillaume.viejo@gmail.com" }] diff --git a/setup.py b/setup.py index d8fc430f..19f677a3 100644 --- a/setup.py +++ b/setup.py @@ -59,8 +59,8 @@ test_suite='tests', tests_require=test_requirements, url='https://github.com/pynapple-org/pynapple', - version='v0.7.0', + version='v0.7.1', zip_safe=False, long_description_content_type='text/markdown', - download_url='https://github.com/pynapple-org/pynapple/archive/refs/tags/v0.7.0.tar.gz' + download_url='https://github.com/pynapple-org/pynapple/archive/refs/tags/v0.7.1.tar.gz' ) diff --git a/tests/test_correlograms.py b/tests/test_correlograms.py index 185c3522..7445452e 100644 --- a/tests/test_correlograms.py +++ b/tests/test_correlograms.py @@ -1,4 +1,3 @@ - """Tests of correlograms for `pynapple` package.""" import pynapple as nap @@ -7,7 +6,6 @@ import pytest from itertools import combinations - def test_cross_correlogram(): t1 = np.array([0]) t2 = np.array([1]) @@ -32,167 +30,489 @@ def test_cross_correlogram(): for t in [100, 200, 1000]: np.testing.assert_array_almost_equal( - nap.process.correlograms._cross_correlogram(np.arange(0, t), np.arange(0, t), 1, t)[0], + nap.process.correlograms._cross_correlogram( + np.arange(0, t), np.arange(0, t), 1, t + )[0], np.hstack( (np.arange(0, 1, 1 / t), np.ones(1), np.arange(0, 1, 1 / t)[::-1]) ), ) +############################# +# Type Error +############################# +def get_group(): + return nap.TsGroup( + { + 0: nap.Ts(t=np.arange(0, 100)), + # 1: nap.Ts(t=np.arange(0, 100)), + # 2: nap.Ts(t=np.array([0, 10])), + # 3: nap.Ts(t=np.arange(0, 200)), + }, + time_support=nap.IntervalSet(0, 100), + ) + + +def get_ep(): + return nap.IntervalSet(start=0, end=100) + + +def get_event(): + return nap.Ts(t=np.arange(0, 100), time_support=nap.IntervalSet(0, 100)) + + @pytest.mark.parametrize( - "group", + "func", [ - nap.TsGroup( - { - 0: nap.Ts(t=np.arange(0, 100)), - 1: nap.Ts(t=np.arange(0, 100)), - 2: nap.Ts(t=np.array([0, 10])), - 3: nap.Ts(t=np.arange(0, 200)), - } + # nap.compute_autocorrelogram, + # nap.compute_crosscorrelogram, + nap.compute_eventcorrelogram + ], +) +@pytest.mark.parametrize( + "group, binsize, windowsize, ep, norm, time_units, msg", + [ + ( + get_group(), + "a", + 10, + get_ep(), + True, + "s", + "Invalid type. Parameter binsize must be of type .", + ), + ( + get_group(), + 1, + "a", + get_ep(), + True, + "s", + "Invalid type. Parameter windowsize must be of type .", + ), + ( + get_group(), + 1, + 10, + "a", + True, + "s", + "Invalid type. Parameter ep must be of type .", + ), + ( + get_group(), + 1, + 10, + get_ep(), + "a", + "s", + "Invalid type. Parameter norm must be of type .", + ), + ( + get_group(), + 1, + 10, + get_ep(), + True, + 1, + "Invalid type. Parameter time_units must be of type .", + ), + ], +) +def test_correlograms_type_errors( + func, group, binsize, windowsize, ep, norm, time_units, msg +): + with pytest.raises(TypeError, match=msg): + func( + group=group, + binsize=binsize, + windowsize=windowsize, + ep=ep, + norm=norm, + time_units=time_units, ) + + +@pytest.mark.parametrize( + "func, args, msg", + [ + ( + nap.compute_autocorrelogram, + ([1, 2, 3], 1, 1), + "Invalid type. Parameter group must be of type TsGroup", + ), + ( + nap.compute_crosscorrelogram, + ([1, 2, 3], 1, 1), + r"Invalid type. Parameter group must be of type TsGroup or a tuple\/list of \(TsGroup, TsGroup\).", + ), + ( + nap.compute_crosscorrelogram, + (([1, 2, 3]), 1, 1), + r"Invalid type. Parameter group must be of type TsGroup or a tuple\/list of \(TsGroup, TsGroup\).", + ), + ( + nap.compute_crosscorrelogram, + ((get_group(), [1, 2, 3]), 1, 1), + r"Invalid type. Parameter group must be of type TsGroup or a tuple\/list of \(TsGroup, TsGroup\).", + ), + ( + nap.compute_crosscorrelogram, + ((get_group(), get_group(), get_group()), 1, 1), + r"Invalid type. Parameter group must be of type TsGroup or a tuple\/list of \(TsGroup, TsGroup\).", + ), + ( + nap.compute_eventcorrelogram, + ([1, 2, 3], 1, 1), + "Invalid type. Parameter group must be of type TsGroup", + ), ], ) -class Test_Correlograms: - def test_autocorrelogram(self, group): - cc = nap.compute_autocorrelogram(group, 1, 100, norm=False) - assert isinstance(cc, pd.DataFrame) - assert list(cc.keys()) == list(group.keys()) - np.testing.assert_array_almost_equal(cc.index.values, np.arange(-100, 101, 1)) - np.testing.assert_array_almost_equal( - cc[0].values, +def test_correlograms_type_errors_group(func, args, msg): + with pytest.raises(TypeError, match=msg): + func(*args) + + +@pytest.mark.parametrize( + "func, args, msg", + [ + ( + nap.compute_eventcorrelogram, + (get_group(), [1, 2, 3], 1, 1), + "Invalid type. Parameter event must be of type \(, \).", + ), + ], +) +def test_correlograms_type_errors_event(func, args, msg): + with pytest.raises(TypeError, match=msg): + func(*args) + + +################################################# +# Normal tests +################################################# + + +@pytest.mark.parametrize( + "group, binsize, windowsize, kwargs, expected", + [ + ( + get_group(), + 1, + 100, + {}, np.hstack( (np.arange(0, 1, 1 / 100), np.zeros(1), np.arange(0, 1, 1 / 100)[::-1]) - ), - ) - np.testing.assert_array_almost_equal( - cc[0].values, + )[:, np.newaxis], + ), + ( + get_group(), + 1, + 100, + {"norm": False}, np.hstack( (np.arange(0, 1, 1 / 100), np.zeros(1), np.arange(0, 1, 1 / 100)[::-1]) - ), - ) - tmp = np.zeros(len(cc)) - tmp[[90, 110]] = 0.5 - np.testing.assert_array_almost_equal(tmp, cc[2]) - - def test_autocorrelogram_error(self, group): - with pytest.raises(RuntimeError) as e_info: - nap.compute_autocorrelogram([1,2,3], 1, 100, norm=False) - assert str(e_info.value) == "Unknown format for group" - - def test_autocorrelogram_with_ep(self, group): - ep = nap.IntervalSet(start=0, end=99) - cc = nap.compute_autocorrelogram(group, 1, 100, ep=ep, norm=False) - np.testing.assert_array_almost_equal(cc[0].values, cc[3].values) - - def test_autocorrelogram_with_norm(self, group): - cc = nap.compute_autocorrelogram(group, 1, 100, norm=False) - cc2 = nap.compute_autocorrelogram(group, 1, 100, norm=True) - tmp = group._metadata["rate"].values.astype("float") - np.testing.assert_array_almost_equal(cc / tmp, cc2) - - def test_autocorrelogram_time_units(self, group): - cc = nap.compute_autocorrelogram(group, 1, 100, time_units="s") - cc2 = nap.compute_autocorrelogram(group, 1 * 1e3, 100 * 1e3, time_units="ms") - cc3 = nap.compute_autocorrelogram(group, 1 * 1e6, 100 * 1e6, time_units="us") - pd.testing.assert_frame_equal(cc, cc2) - pd.testing.assert_frame_equal(cc, cc3) - - def test_crosscorrelogram(self, group): - cc = nap.compute_crosscorrelogram(group, 1, 100, norm=False) - assert isinstance(cc, pd.DataFrame) - assert list(cc.keys()) == list(combinations(group.keys(), 2)) - np.testing.assert_array_almost_equal(cc.index.values, np.arange(-100, 101, 1)) - np.testing.assert_array_almost_equal( - cc[(0, 1)].values, + )[:, np.newaxis], + ), + ( + nap.TsGroup({1: nap.Ts(t=np.array([0, 10]))}), + 1, + 100, + {"norm": False}, np.hstack( - (np.arange(0, 1, 1 / 100), np.ones(1), np.arange(0, 1, 1 / 100)[::-1]) - ), + ( + np.zeros(90), + np.array([0.5]), + np.zeros((19)), + np.array([0.5]), + np.zeros((90)), + ) + )[:, np.newaxis], + ), + ( + get_group(), + 1, + 100, + {"ep": get_ep()}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.zeros(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ( + get_group(), + 1, + 100, + {"time_units": "s"}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.zeros(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ( + get_group(), + 1 * 1e3, + 100 * 1e3, + {"time_units": "ms"}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.zeros(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ( + get_group(), + 1 * 1e6, + 100 * 1e6, + {"time_units": "us"}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.zeros(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ], +) +def test_autocorrelogram(group, binsize, windowsize, kwargs, expected): + cc = nap.compute_autocorrelogram(group, binsize, windowsize, **kwargs) + assert isinstance(cc, pd.DataFrame) + assert list(cc.keys()) == list(group.keys()) + if "time_units" in kwargs: + if kwargs["time_units"] == "ms": + np.testing.assert_array_almost_equal( + cc.index.values * 1e3, + np.arange(-windowsize, windowsize + binsize, binsize), + ) + if kwargs["time_units"] == "us": + np.testing.assert_array_almost_equal( + cc.index.values * 1e6, + np.arange(-windowsize, windowsize + binsize, binsize), + ) + if kwargs["time_units"] == "s": + np.testing.assert_array_almost_equal( + cc.index.values, np.arange(-windowsize, windowsize + binsize, binsize) + ) + else: + np.testing.assert_array_almost_equal( + cc.index.values, np.arange(-windowsize, windowsize + binsize, binsize) ) + np.testing.assert_array_almost_equal(cc.values, expected) - def test_crosscorrelogram_reverse(self, group): - cc = nap.compute_crosscorrelogram(group, 1, 100, reverse=True) - assert isinstance(cc, pd.DataFrame) - - from itertools import combinations - pairs = list(combinations(group.index, 2)) - pairs = list(map(lambda n: (n[1], n[0]), pairs)) - - assert pairs == list(cc.keys()) - - def test_crosscorrelogram_with_ep(self, group): - ep = nap.IntervalSet(start=0, end=99) - cc = nap.compute_crosscorrelogram(group, 1, 100, ep=ep, norm=False) - np.testing.assert_array_almost_equal(cc[(0, 1)].values, cc[(0, 3)].values) - - def test_crosscorrelogram_with_norm(self, group): - cc = nap.compute_crosscorrelogram(group, 1, 100, norm=False) - cc2 = nap.compute_crosscorrelogram(group, 1, 100, norm=True) - tmp = group._metadata["rate"].values.astype("float") - tmp = tmp[[t[1] for t in cc.columns]] - np.testing.assert_array_almost_equal(cc / tmp, cc2) - - def test_crosscorrelogram_time_units(self, group): - cc = nap.compute_crosscorrelogram(group, 1, 100, time_units="s") - cc2 = nap.compute_crosscorrelogram(group, 1 * 1e3, 100 * 1e3, time_units="ms") - cc3 = nap.compute_crosscorrelogram(group, 1 * 1e6, 100 * 1e6, time_units="us") - pd.testing.assert_frame_equal(cc, cc2) - pd.testing.assert_frame_equal(cc, cc3) +@pytest.mark.parametrize( + "group, event, binsize, windowsize, kwargs, expected", + [ + ( + get_group(), + get_event(), + 1, + 100, + {}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.ones(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ( + get_group(), + get_event(), + 1, + 100, + {"norm": False}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.ones(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ( + get_group(), + get_event(), + 1, + 100, + {"ep": get_ep()}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.ones(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ( + get_group(), + get_event(), + 1, + 100, + {"time_units": "s"}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.ones(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ( + get_group(), + get_event(), + 1 * 1e3, + 100 * 1e3, + {"time_units": "ms"}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.ones(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ( + get_group(), + get_event(), + 1 * 1e6, + 100 * 1e6, + {"time_units": "us"}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.ones(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ], +) +def test_eventcorrelogram(group, event, binsize, windowsize, kwargs, expected): + cc = nap.compute_eventcorrelogram(group, event, binsize, windowsize, **kwargs) + assert isinstance(cc, pd.DataFrame) + assert list(cc.keys()) == list(group.keys()) + if "time_units" in kwargs: + if kwargs["time_units"] == "ms": + np.testing.assert_array_almost_equal( + cc.index.values * 1e3, + np.arange(-windowsize, windowsize + binsize, binsize), + ) + if kwargs["time_units"] == "us": + np.testing.assert_array_almost_equal( + cc.index.values * 1e6, + np.arange(-windowsize, windowsize + binsize, binsize), + ) + if kwargs["time_units"] == "s": + np.testing.assert_array_almost_equal( + cc.index.values, np.arange(-windowsize, windowsize + binsize, binsize) + ) + else: + np.testing.assert_array_almost_equal( + cc.index.values, np.arange(-windowsize, windowsize + binsize, binsize) + ) + np.testing.assert_array_almost_equal(cc.values, expected) - def test_crosscorrelogram_error(self, group): - with pytest.raises(RuntimeError) as e_info: - nap.compute_crosscorrelogram([1,2,3], 1, 100) - assert str(e_info.value) == "Unknown format for group" - def test_crosscorrelogram_with_tuple(self, group): - from itertools import product - groups = (group[[0,1]], group[[2,3]]) - cc = nap.compute_crosscorrelogram(groups, 1, 100, norm=False) +def get_group2(): + return nap.TsGroup( + { + 0: nap.Ts(t=np.arange(0, 100)), + 1: nap.Ts(t=np.arange(0, 100)), + # 2: nap.Ts(t=np.array([0, 10])), + # 3: nap.Ts(t=np.arange(0, 200)), + }, + time_support=nap.IntervalSet(0, 100), + ) - assert isinstance(cc, pd.DataFrame) - assert list(cc.keys()) == list(product(groups[0].keys(), groups[1].keys())) - np.testing.assert_array_almost_equal(cc.index.values, np.arange(-100, 101, 1)) - cc2 = nap.compute_crosscorrelogram(group[[0,2]], 1, 100, norm=False) - np.testing.assert_array_almost_equal( - cc[(0, 2)].values, - cc2[(0,2)].values +@pytest.mark.parametrize( + "group, binsize, windowsize, kwargs, expected", + [ + ( + get_group2(), + 1, + 100, + {}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.ones(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ( + get_group2(), + 1, + 100, + {"norm": False}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.ones(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ( + (get_group(), get_group()), + 1, + 100, + {}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.ones(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ( + get_group2(), + 1, + 100, + {"ep": get_ep()}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.ones(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ( + (get_group(), get_group()), + 1, + 100, + {"ep": get_ep()}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.ones(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ( + (get_group(), get_group()), + 1, + 100, + {"norm": False}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.ones(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ( + get_group2(), + 1, + 100, + {"time_units": "s"}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.ones(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ( + get_group2(), + 1 * 1e3, + 100 * 1e3, + {"time_units": "ms"}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.ones(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ( + get_group2(), + 1 * 1e6, + 100 * 1e6, + {"time_units": "us"}, + np.hstack( + (np.arange(0, 1, 1 / 100), np.ones(1), np.arange(0, 1, 1 / 100)[::-1]) + )[:, np.newaxis], + ), + ], +) +def test_crosscorrelogram(group, binsize, windowsize, kwargs, expected): + cc = nap.compute_crosscorrelogram(group, binsize, windowsize, **kwargs) + assert isinstance(cc, pd.DataFrame) + if isinstance(group, nap.TsGroup): + assert list(cc.keys()) == list(combinations(group.keys(), 2)) + else: + assert list(cc.keys()) == [(0, 0)] + if "time_units" in kwargs: + if kwargs["time_units"] == "ms": + np.testing.assert_array_almost_equal( + cc.index.values * 1e3, + np.arange(-windowsize, windowsize + binsize, binsize), ) - - def test_eventcorrelogram(self, group): - cc = nap.compute_eventcorrelogram(group, group[0], 1, 100, norm=False) - cc2 = nap.compute_crosscorrelogram(group, 1, 100, norm=False) - assert isinstance(cc, pd.DataFrame) - assert list(cc.keys()) == list(group.keys()) - np.testing.assert_array_almost_equal(cc[1].values, cc2[(0, 1)].values) - - def test_eventcorrelogram_with_ep(self, group): - ep = nap.IntervalSet(start=0, end=99) - cc = nap.compute_eventcorrelogram(group, group[0], 1, 100, ep=ep, norm=False) - cc2 = nap.compute_crosscorrelogram(group, 1, 100, ep=ep, norm=False) - assert isinstance(cc, pd.DataFrame) - assert list(cc.keys()) == list(group.keys()) - np.testing.assert_array_almost_equal(cc[1].values, cc2[(0, 1)].values) - - def test_eventcorrelogram_with_norm(self, group): - cc = nap.compute_eventcorrelogram(group, group[0], 1, 100, norm=False) - cc2 = nap.compute_eventcorrelogram(group, group[0], 1, 100, norm=True) - # tmp = group._metadata["rate"].values.astype("float") - tmp = group.get_info("rate").values - np.testing.assert_array_almost_equal(cc / tmp, cc2) - - def test_eventcorrelogram_time_units(self, group): - cc = nap.compute_eventcorrelogram(group, group[0], 1, 100, time_units="s") - cc2 = nap.compute_eventcorrelogram( - group, group[0], 1 * 1e3, 100 * 1e3, time_units="ms" - ) - cc3 = nap.compute_eventcorrelogram( - group, group[0], 1 * 1e6, 100 * 1e6, time_units="us" + if kwargs["time_units"] == "us": + np.testing.assert_array_almost_equal( + cc.index.values * 1e6, + np.arange(-windowsize, windowsize + binsize, binsize), + ) + if kwargs["time_units"] == "s": + np.testing.assert_array_almost_equal( + cc.index.values, np.arange(-windowsize, windowsize + binsize, binsize) + ) + else: + np.testing.assert_array_almost_equal( + cc.index.values, np.arange(-windowsize, windowsize + binsize, binsize) ) - pd.testing.assert_frame_equal(cc, cc2) - pd.testing.assert_frame_equal(cc, cc3) + np.testing.assert_array_almost_equal(cc.values, expected) + - def test_eventcorrelogram_error(self, group): - with pytest.raises(RuntimeError) as e_info: - nap.compute_eventcorrelogram([1,2,3], group[0], 1, 100) - assert str(e_info.value) == "Unknown format for group" +def test_crosscorrelogram_reverse(): + cc = nap.compute_crosscorrelogram(get_group2(), 1, 100, reverse=True) + assert isinstance(cc, pd.DataFrame) + assert list(cc.keys()) == [(1, 0)] diff --git a/tests/test_power_spectral_density.py b/tests/test_power_spectral_density.py index fc76103c..92427e98 100644 --- a/tests/test_power_spectral_density.py +++ b/tests/test_power_spectral_density.py @@ -13,14 +13,16 @@ # Test for power_spectral_density ############################################################ -def get_sorted_fft(data,fs): + +def get_sorted_fft(data, fs): fft = np.fft.fft(data, axis=0) fft_freq = np.fft.fftfreq(len(data), 1 / fs) order = np.argsort(fft_freq) - if fft.ndim==1: - fft = fft[:,np.newaxis] + if fft.ndim == 1: + fft = fft[:, np.newaxis] return fft_freq[order], fft[order] + def test_compute_power_spectral_density(): t = np.linspace(0, 1, 1000) @@ -30,11 +32,11 @@ def test_compute_power_spectral_density(): assert r.shape[0] == 500 a, b = get_sorted_fft(sig.values, sig.rate) - np.testing.assert_array_almost_equal(r.index.values, a[a>=0]) - np.testing.assert_array_almost_equal(r.values, b[a>=0]) - + np.testing.assert_array_almost_equal(r.index.values, a[a >= 0]) + np.testing.assert_array_almost_equal(r.values, b[a >= 0]) + r = nap.compute_power_spectral_density(sig, norm=True) - np.testing.assert_array_almost_equal(r.values, b[a>=0]/len(sig)) + np.testing.assert_array_almost_equal(r.values, b[a >= 0] / len(sig)) sig = nap.TsdFrame(d=np.random.random((1000, 4)), t=t) r = nap.compute_power_spectral_density(sig) @@ -42,8 +44,8 @@ def test_compute_power_spectral_density(): assert r.shape == (500, 4) a, b = get_sorted_fft(sig.values, sig.rate) - np.testing.assert_array_almost_equal(r.index.values, a[a>=0]) - np.testing.assert_array_almost_equal(r.values, b[a>=0]) + np.testing.assert_array_almost_equal(r.index.values, a[a >= 0]) + np.testing.assert_array_almost_equal(r.values, b[a >= 0]) sig = nap.TsdFrame(d=np.random.random((1000, 4)), t=t) r = nap.compute_power_spectral_density(sig, full_range=True) @@ -52,7 +54,7 @@ def test_compute_power_spectral_density(): a, b = get_sorted_fft(sig.values, sig.rate) np.testing.assert_array_almost_equal(r.index.values, a) - np.testing.assert_array_almost_equal(r.values, b) + np.testing.assert_array_almost_equal(r.values, b) t = np.linspace(0, 1, 1000) sig = nap.Tsd(d=np.random.random(1000), t=t) @@ -68,7 +70,7 @@ def test_compute_power_spectral_density(): @pytest.mark.parametrize( - "sig, fs, ep, full_range, norm, expectation", + "sig, kwargs, expectation", [ ( nap.Tsd( @@ -76,10 +78,7 @@ def test_compute_power_spectral_density(): t=np.linspace(0, 1, 1000), time_support=nap.IntervalSet(start=[0.1, 0.6], end=[0.2, 0.81]), ), - 1000, - None, - False, - False, + {}, pytest.raises( ValueError, match=re.escape( @@ -89,66 +88,57 @@ def test_compute_power_spectral_density(): ), ( nap.Tsd(d=np.random.random(1000), t=np.linspace(0, 1, 1000)), - 1000, - "not_ep", - False, - False, + {"ep": "not_ep"}, pytest.raises( TypeError, - match="ep param must be a pynapple IntervalSet object, or None", + match=re.escape( + "Invalid type. Parameter ep must be of type ." + ), ), ), ( nap.Tsd(d=np.random.random(1000), t=np.linspace(0, 1, 1000)), - "a", - None, - False, - False, + {"fs": "a"}, pytest.raises( TypeError, - match="fs must be of type float or int", + match="Invalid type. Parameter fs must be of type .", ), ), ( "not_a_tsd", - 1000, - None, - False, - False, + {}, pytest.raises( TypeError, - match="sig must be either a Tsd or a TsdFrame object.", + match=re.escape( + "Invalid type. Parameter sig must be of type (, )." + ), ), ), ( nap.Tsd(d=np.random.random(1000), t=np.linspace(0, 1, 1000)), - 1000, - None, - "a", - False, + {"full_range": "a"}, pytest.raises( TypeError, - match="full_range must be of type bool or None", + match=re.escape( + "Invalid type. Parameter full_range must be of type ." + ), ), ), ( nap.Tsd(d=np.random.random(1000), t=np.linspace(0, 1, 1000)), - 1000, - None, - False, - "a", + {"norm": "a"}, pytest.raises( TypeError, - match="norm must be of type bool", + match=re.escape( + "Invalid type. Parameter norm must be of type ." + ), ), - ), + ), ], ) -def test_compute_power_spectral_density_raise_errors( - sig, fs, ep, full_range, norm, expectation -): +def test_compute_power_spectral_density_raise_errors(sig, kwargs, expectation): with expectation: - psd = nap.compute_power_spectral_density(sig, fs, ep, full_range, norm) + nap.compute_power_spectral_density(sig, **kwargs) ############################################################ @@ -156,13 +146,22 @@ def test_compute_power_spectral_density_raise_errors( ############################################################ -def get_signal_and_output(f=2, fs=1000, duration=100, interval_size=10): +def get_signal_and_output(f=2, fs=1000, duration=100, interval_size=10, overlap = 0.25): t = np.arange(0, duration, 1 / fs) d = np.cos(2 * np.pi * f * t) sig = nap.Tsd(t=t, d=d, time_support=nap.IntervalSet(0, 100)) - tmp = d.reshape((int(duration / interval_size), int(fs * interval_size))).T + tmp = [] + slice = (0, int(fs*interval_size)+1) + while slice[1] < len(d): + tmp.append(d[slice[0]:slice[1]]) + new_slice = (slice[1]-int(fs*interval_size*overlap)-1, slice[1]+int(fs*interval_size*(1-overlap))) + slice = new_slice + + tmp = np.transpose(np.array(tmp)) + print(tmp.shape) + # tmp = d.reshape((int(duration / interval_size), int(fs * interval_size))).T # tmp = tmp[0:-1] - tmp = tmp*signal.windows.hamming(tmp.shape[0])[:,np.newaxis] + tmp = tmp * signal.windows.hamming(tmp.shape[0])[:, np.newaxis] out = np.sum(np.fft.fft(tmp, axis=0), 1) freq = np.fft.fftfreq(out.shape[0], 1 / fs) order = np.argsort(freq) @@ -190,10 +189,11 @@ def test_compute_mean_power_spectral_density(): psd = nap.compute_mean_power_spectral_density(sig, 10, norm=True) assert isinstance(psd, pd.DataFrame) assert psd.shape[0] > 0 # Check that the psd DataFrame is not empty - np.testing.assert_array_almost_equal(psd.values.flatten(), out[freq >= 0]/(10000.0*10.0)) + np.testing.assert_array_almost_equal( + psd.values.flatten(), out[freq >= 0] / (10001.0 * 12.0) + ) np.testing.assert_array_almost_equal(psd.index.values, freq[freq >= 0]) - # TsdFrame sig2 = nap.TsdFrame( t=sig.t, d=np.repeat(sig.values[:, None], 2, 1), time_support=sig.time_support @@ -216,106 +216,123 @@ def test_compute_mean_power_spectral_density(): @pytest.mark.parametrize( - "sig, out, freq, interval_size, fs, ep, full_range, norm, time_units, expectation", + "sig, interval_size, kwargs, expectation", [ - (*get_signal_and_output(), 10, None, None, False, False, "s", does_not_raise()), + (get_signal_and_output()[0], 10, {}, does_not_raise()), ( - "a", *get_signal_and_output()[1:], + "a", 10, - None, - None, - False, - False, - "s", - pytest.raises(TypeError, match="sig must be either a Tsd or a TsdFrame object."), + {}, + pytest.raises( + TypeError, + match=re.escape( + "Invalid type. Parameter sig must be of type (, )." + ), + ), ), ( - *get_signal_and_output(), + get_signal_and_output()[0], 10, - "a", - None, - False, - False, - "s", - pytest.raises(TypeError, match="fs must be of type float or int"), + {"fs": "a"}, + pytest.raises( + TypeError, + match=re.escape( + "Invalid type. Parameter fs must be of type ." + ), + ), ), ( - *get_signal_and_output(), + get_signal_and_output()[0], 10, - None, - "a", - False, - False, - "s", + {"ep": "a"}, pytest.raises( TypeError, - match="ep param must be a pynapple IntervalSet object, or None", + match=re.escape( + "Invalid type. Parameter ep must be of type ." + ), ), ), ( - *get_signal_and_output(), + get_signal_and_output()[0], 10, - None, - None, - "a", - False, - "s", - pytest.raises(TypeError, match="full_range must be of type bool or None"), + {"ep": "a"}, + pytest.raises( + TypeError, + match=re.escape( + "Invalid type. Parameter ep must be of type ." + ), + ), ), ( - *get_signal_and_output(), + get_signal_and_output()[0], 10, - None, # FS - None, # Ep - "a", # full_range - False, # Norm - "s", # Time units - pytest.raises(TypeError, match="full_range must be of type bool or None"), + {"norm": "a"}, + pytest.raises( + TypeError, + match=re.escape( + "Invalid type. Parameter norm must be of type ." + ), + ), ), ( - *get_signal_and_output(), - 10, - None, # FS - None, # Ep - False, # full_range - "a", # Norm - "s", # Time units - pytest.raises(TypeError, match="norm must be of type bool"), - ), - (*get_signal_and_output(), 10 * 1e3, None, None, False, False, "ms", does_not_raise()), - (*get_signal_and_output(), 10 * 1e6, None, None, False, False, "us", does_not_raise()), + get_signal_and_output()[0], + 10, + {"overlap": "a"}, + pytest.raises( + TypeError, + match=re.escape( + "Invalid type. Parameter overlap must be of type ." + ), + ), + ), ( - *get_signal_and_output(), + get_signal_and_output()[0], 200, - None, - None, - False, - False, - "s", + {}, pytest.raises( RuntimeError, - match="Splitting epochs with interval_size=200 generated an empty IntervalSet. Try decreasing interval_size", + match=re.escape( + "Splitting epochs with interval_size=200 generated an empty IntervalSet. Try decreasing interval_size" + ), ), ), ( - *get_signal_and_output(), + get_signal_and_output()[0], 10, - None, - nap.IntervalSet([0, 200], [100, 300]), - False, - False, - "s", + {"ep": nap.IntervalSet([0, 200], [100, 300])}, pytest.raises( RuntimeError, - match="One interval doesn't have any signal associated. Check the parameter ep or the time support if no epoch is passed.", + match=re.escape( + "One interval doesn't have any signal associated. Check the parameter ep or the time support if no epoch is passed." + ), ), ), + ( + get_signal_and_output()[0], + 10, + {"overlap": -0.1}, + pytest.raises( + ValueError, + match=re.escape( + "Overlap should be in intervals [0.0, 1.0)." + ), + ), + ), + ( + get_signal_and_output()[0], + 10, + {"overlap": 1.1}, + pytest.raises( + ValueError, + match=re.escape( + "Overlap should be in intervals [0.0, 1.0)." + ), + ), + ), ], ) def test_compute_mean_power_spectral_density_raise_errors( - sig, out, freq, interval_size, fs, ep, full_range, norm, time_units, expectation + sig, interval_size, kwargs, expectation ): with expectation: - psd = nap.compute_mean_power_spectral_density( - sig, interval_size, fs, ep, full_range, norm, time_units - ) + nap.compute_mean_power_spectral_density(sig, interval_size, **kwargs) diff --git a/tests/test_tuning_curves.py b/tests/test_tuning_curves.py index 2c9c9144..9610c0cf 100644 --- a/tests/test_tuning_curves.py +++ b/tests/test_tuning_curves.py @@ -1,9 +1,3 @@ -# -*- coding: utf-8 -*- -# @Author: gviejo -# @Date: 2022-03-30 11:16:30 -# @Last Modified by: Guillaume Viejo -# @Last Modified time: 2024-01-29 11:05:11 - """Tests of tuning curves for `pynapple` package.""" from contextlib import nullcontext as does_not_raise import pynapple as nap @@ -11,396 +5,328 @@ import pandas as pd import pytest -def test_compute_discrete_tuning_curves(): - tsgroup = nap.TsGroup({0: nap.Ts(t=np.arange(0, 100))}) - dict_ep = { 0:nap.IntervalSet(start=0, end=50), - 1:nap.IntervalSet(start=50, end=100)} - tc = nap.compute_discrete_tuning_curves(tsgroup, dict_ep) - assert len(tc) == 2 - assert list(tc.columns) == list(tsgroup.keys()) - np.testing.assert_array_almost_equal(tc.index.values, np.array(list(dict_ep.keys()))) - np.testing.assert_almost_equal(tc.loc[0,0], 51/50) - np.testing.assert_almost_equal(tc.loc[1,0], 1) - -def test_compute_discrete_tuning_curves_with_strings(): - tsgroup = nap.TsGroup({0: nap.Ts(t=np.arange(0, 100))}) - dict_ep = { "0":nap.IntervalSet(start=0, end=50), - "1":nap.IntervalSet(start=50, end=100)} - tc = nap.compute_discrete_tuning_curves(tsgroup, dict_ep) - assert len(tc) == 2 - assert list(tc.columns) == list(tsgroup.keys()) - assert list(tc.index) == list(dict_ep.keys()) - np.testing.assert_almost_equal(tc.loc["0",0], 51/50) - np.testing.assert_almost_equal(tc.loc["1",0], 1) - -def test_compute_discrete_tuning_curves_error(): - dict_ep = { "0":nap.IntervalSet(start=0, end=50), - "1":nap.IntervalSet(start=50, end=100)} - with pytest.raises(AssertionError) as e_info: - nap.compute_discrete_tuning_curves([1,2,3], dict_ep) - assert str(e_info.value) == "group should be a TsGroup." - - tsgroup = nap.TsGroup({0: nap.Ts(t=np.arange(0, 100))}) - dict_ep = { "0":nap.IntervalSet(start=0, end=50), - "1":nap.IntervalSet(start=50, end=100)} - k = [1,2,3] - dict_ep["2"] = k - with pytest.raises(AssertionError) as e_info: - nap.compute_discrete_tuning_curves(tsgroup, dict_ep) - assert str(e_info.value) == "dict_ep argument should contain only IntervalSet. Key 2 in dict_ep is not an IntervalSet" - -def test_compute_1d_tuning_curves(): - tsgroup = nap.TsGroup({0: nap.Ts(t=np.arange(0, 100))}) - feature = nap.Tsd(t=np.arange(0, 100, 0.1), d=np.arange(0, 100, 0.1) % 1.0) - tc = nap.compute_1d_tuning_curves(tsgroup, feature, nb_bins=10) - - assert len(tc) == 10 - assert list(tc.columns) == list(tsgroup.keys()) - np.testing.assert_array_almost_equal(tc[0].values[1:], np.zeros(9)) - assert int(tc[0].values[0]) == 10 - -def test_compute_1d_tuning_curves_error(): - feature = nap.Tsd(t=np.arange(0, 100, 0.1), d=np.arange(0, 100, 0.1) % 1.0) - with pytest.raises(AssertionError) as e_info: - nap.compute_1d_tuning_curves([1,2,3], feature, nb_bins=10) - assert str(e_info.value) == "group should be a TsGroup." - -def test_compute_1d_tuning_curves_with_ep(): - tsgroup = nap.TsGroup({0: nap.Ts(t=np.arange(0, 100))}) - feature = nap.Tsd(t=np.arange(0, 100, 0.1), d=np.arange(0, 100, 0.1) % 1.0) - tc1 = nap.compute_1d_tuning_curves(tsgroup, feature, nb_bins=10) - ep = nap.IntervalSet(start=0, end=50) - tc2 = nap.compute_1d_tuning_curves(tsgroup, feature, nb_bins=10, ep=ep) - pd.testing.assert_frame_equal(tc1, tc2) - - -def test_compute_1d_tuning_curves_with_min_max(): - tsgroup = nap.TsGroup({0: nap.Ts(t=np.arange(0, 100))}) - feature = nap.Tsd(t=np.arange(0, 100, 0.1), d=np.arange(0, 100, 0.1) % 1.0) - tc = nap.compute_1d_tuning_curves(tsgroup, feature, nb_bins=10, minmax=(0, 1)) - assert len(tc) == 10 - np.testing.assert_array_almost_equal(tc[0].values[1:], np.zeros(9)) - assert tc[0].values[0] >= 9 - - -def test_compute_2d_tuning_curves(): - tsgroup = nap.TsGroup( - {0: nap.Ts(t=np.arange(0, 100, 10)), 1: nap.Ts(t=np.array([50, 149]))} - ) +######################## +# Type Error +######################## +def get_group(): + return nap.TsGroup({0: nap.Ts(t=np.arange(0, 100))}) +def get_feature(): + return nap.Tsd( + t=np.arange(0, 100, 0.1), d=np.arange(0, 100, 0.1) % 1.0, + time_support = nap.IntervalSet(0, 100) + ) +def get_features(): tmp = np.vstack( (np.repeat(np.arange(0, 100), 10), np.tile(np.arange(0, 100), 10)) ).T - features = nap.TsdFrame(t=np.arange(0, 200, 0.1), d=np.vstack((tmp, tmp[::-1]))) - tc, xy = nap.compute_2d_tuning_curves(tsgroup, features, 10) + return nap.TsdFrame( + t=np.arange(0, 200, 0.1), d=np.vstack((tmp, tmp[::-1])), + time_support = nap.IntervalSet(0, 200) + ) +def get_ep(): + return nap.IntervalSet(start=0, end=50) +def get_tsdframe(): + return nap.TsdFrame(t=np.arange(0, 100), d=np.ones((100, 2))) + +@pytest.mark.parametrize("group, dict_ep, expected_exception", [ + ("a", {0:nap.IntervalSet(start=0, end=50),1:nap.IntervalSet(start=50, end=100)}, pytest.raises(TypeError,match="group should be a TsGroup.")), + (get_group(), "a", pytest.raises(TypeError,match="dict_ep should be a dictionary of IntervalSet")), + (get_group(), {0:"a",1:nap.IntervalSet(start=50, end=100)}, pytest.raises(TypeError,match="dict_ep argument should contain only IntervalSet.")), +]) +def test_compute_discrete_tuning_curves_errors(group, dict_ep, expected_exception): + with expected_exception: + nap.compute_discrete_tuning_curves(group, dict_ep) + +@pytest.mark.parametrize("group, feature, nb_bins, ep, minmax, expected_exception", [ + ("a",get_feature(), 10, get_ep(), (0, 1), "group should be a TsGroup."), + (get_group(),"a", 10, get_ep(), (0, 1), r"feature should be a Tsd \(or TsdFrame with 1 column only\)"), + (get_group(),get_feature(), "a", get_ep(), (0, 1), r"nb_bins should be of type int \(or tuple with \(int, int\) for 2D tuning curves\)."), + (get_group(),get_feature(), 10, "a", (0, 1), r"ep should be an IntervalSet"), + (get_group(),get_feature(), 10, get_ep(), 1, r"minmax should be a tuple\/list of 2 numbers"), +]) +def test_compute_1d_tuning_curves_errors(group, feature, nb_bins, ep, minmax, expected_exception): + with pytest.raises(TypeError, match=expected_exception): + nap.compute_1d_tuning_curves(group, feature, nb_bins, ep, minmax) + +@pytest.mark.parametrize("group, features, nb_bins, ep, minmax, expected_exception", [ + ("a",get_features(), 10, get_ep(), (0, 1), "group should be a TsGroup."), + (get_group(),"a", 10, get_ep(), (0, 1), r"features should be a TsdFrame with 2 columns"), + (get_group(),get_features(), "a", get_ep(), (0, 1), r"nb_bins should be of type int \(or tuple with \(int, int\) for 2D tuning curves\)."), + (get_group(),get_features(), 10, "a", (0, 1), r"ep should be an IntervalSet"), + (get_group(),get_features(), 10, get_ep(), 1, r"minmax should be a tuple\/list of 2 numbers"), +]) +def test_compute_2d_tuning_curves_errors(group, features, nb_bins, ep, minmax, expected_exception): + with pytest.raises(TypeError, match=expected_exception): + nap.compute_2d_tuning_curves(group, features, nb_bins, ep, minmax) + +@pytest.mark.parametrize("tc, feature, ep, minmax, bitssec, expected_exception", [ + ("a", get_feature(), get_ep(), (0, 1), True, "Argument tc should be of type pandas.DataFrame or numpy.ndarray"), + (pd.DataFrame(),"a", get_ep(), (0, 1), True, r"feature should be a Tsd \(or TsdFrame with 1 column only\)"), + (pd.DataFrame(), get_feature(), "a", (0, 1), True, r"ep should be an IntervalSet"), + (pd.DataFrame(), get_feature(), get_ep(), 1, True, r"minmax should be a tuple\/list of 2 numbers"), + (pd.DataFrame(), get_feature(), get_ep(), (0,1), "a", r"Argument bitssec should be of type bool"), +]) +def test_compute_1d_mutual_info_errors(tc, feature, ep, minmax, bitssec, expected_exception): + with pytest.raises(TypeError, match=expected_exception): + nap.compute_1d_mutual_info(tc, feature, ep, minmax, bitssec) + +@pytest.mark.parametrize("dict_tc, features, ep, minmax, bitssec, expected_exception", [ + ("a", get_features(), get_ep(), (0, 1), True, "Argument dict_tc should be a dictionary of numpy.ndarray or numpy.ndarray"), + ({0:np.zeros((2,2))},"a", get_ep(), (0, 1), True, r"features should be a TsdFrame with 2 columns"), + ({0:np.zeros((2,2))}, get_features(), "a", (0, 1), True, r"ep should be an IntervalSet"), + ({0:np.zeros((2,2))}, get_features(), get_ep(), 1, True, r"minmax should be a tuple\/list of 2 numbers"), + ({0:np.zeros((2,2))}, get_features(), get_ep(), (0,1), "a", r"Argument bitssec should be of type bool"), +]) +def test_compute_2d_mutual_info_errors(dict_tc, features, ep, minmax, bitssec, expected_exception): + with pytest.raises(TypeError, match=expected_exception): + nap.compute_2d_mutual_info(dict_tc, features, ep, minmax, bitssec) + +@pytest.mark.parametrize("tsdframe, feature, nb_bins, ep, minmax, expected_exception", [ + ("a",get_feature(), 10, get_ep(), (0, 1), "Argument tsdframe should be of type Tsd or TsdFrame."), + (get_tsdframe(),"a", 10, get_ep(), (0, 1), r"feature should be a Tsd \(or TsdFrame with 1 column only\)"), + (get_tsdframe(),get_feature(), "a", get_ep(), (0, 1), r"nb_bins should be of type int \(or tuple with \(int, int\) for 2D tuning curves\)."), + (get_tsdframe(),get_feature(), 10, "a", (0, 1), r"ep should be an IntervalSet"), + (get_tsdframe(),get_feature(), 10, get_ep(), 1, r"minmax should be a tuple\/list of 2 numbers"), +]) +def test_compute_1d_tuning_curves_continuous_errors(tsdframe, feature, nb_bins, ep, minmax, expected_exception): + with pytest.raises(TypeError, match=expected_exception): + nap.compute_1d_tuning_curves_continuous(tsdframe, feature, nb_bins, ep, minmax) + +@pytest.mark.parametrize("tsdframe, features, nb_bins, ep, minmax, expected_exception", [ + ("a",get_features(), 10, get_ep(), (0, 1), "Argument tsdframe should be of type Tsd or TsdFrame."), + (get_tsdframe(),"a", 10, get_ep(), (0, 1), r"features should be a TsdFrame with 2 columns"), + (get_tsdframe(),get_features(), "a", get_ep(), (0, 1), r"nb_bins should be of type int \(or tuple with \(int, int\) for 2D tuning curves\)."), + (get_tsdframe(),get_features(), 10, "a", (0, 1), r"ep should be an IntervalSet"), + (get_tsdframe(),get_features(), 10, get_ep(), 1, r"minmax should be a tuple\/list of 2 numbers"), +]) +def test_compute_2d_tuning_curves_continuous_errors(tsdframe, features, nb_bins, ep, minmax, expected_exception): + with pytest.raises(TypeError, match=expected_exception): + nap.compute_2d_tuning_curves_continuous(tsdframe, features, nb_bins, ep, minmax) + +######################## +# ValueError test +######################## +@pytest.mark.parametrize("func, args, minmax, expected", [ + (nap.compute_1d_tuning_curves, (get_group(), get_feature(), 10), (0,1,2), "minmax should be of length 2."), + (nap.compute_1d_tuning_curves_continuous, (get_tsdframe(), get_feature(), 10), (0,1,2), "minmax should be of length 2."), + (nap.compute_2d_tuning_curves, (get_group(), get_features(), 10), (0,1,2), "minmax should be of length 4."), + (nap.compute_2d_tuning_curves_continuous, (get_tsdframe(), get_features(), 10), (0,1,2), "minmax should be of length 4."), + (nap.compute_2d_tuning_curves, (get_group(), nap.TsdFrame(t=np.arange(10),d=np.ones((10,3))), 10), (0,1), "features should have 2 columns only."), + (nap.compute_1d_tuning_curves, (get_group(), nap.TsdFrame(t=np.arange(10),d=np.ones((10,3))), 10), (0,1), r"feature should be a Tsd \(or TsdFrame with 1 column only\)"), + (nap.compute_2d_tuning_curves, (get_group(), get_features(), (0, 1, 2)), (0,1,2,3), r"nb_bins should be of type int \(or tuple with \(int, int\) for 2D tuning curves\)."), + (nap.compute_2d_tuning_curves_continuous, (get_tsdframe(), get_features(), (0, 1, 2)), (0,1,2,3), r"nb_bins should be of type int \(or tuple with \(int, int\) for 2D tuning curves\)."), +]) +def test_compute_tuning_curves_value_error(func, args, minmax, expected): + with pytest.raises(ValueError, match=expected): + func(*args, minmax=minmax) + + +######################## +# Normal test +######################## +@pytest.mark.parametrize("group", [ + get_group() +]) +@pytest.mark.parametrize("dict_ep", [ + { 0:nap.IntervalSet(start=0, end=50), 1:nap.IntervalSet(start=50, end=100)}, + { "0":nap.IntervalSet(start=0, end=50), "1":nap.IntervalSet(start=50, end=100)} +]) +def test_compute_discrete_tuning_curves(group, dict_ep): + tc = nap.compute_discrete_tuning_curves(group, dict_ep) + assert len(tc) == 2 + assert list(tc.columns) == list(group.keys()) + assert list(tc.index.values) == list(dict_ep.keys()) + np.testing.assert_almost_equal(tc.iloc[0,0], 51/50) + np.testing.assert_almost_equal(tc.iloc[1,0], 1) +@pytest.mark.filterwarnings("ignore") +@pytest.mark.parametrize("args, kwargs, expected", [ + ((get_group(), get_feature(), 10), {}, np.array([10.0]+[0.0]*9)[:,None]), + ((get_group(), get_feature(), 10), {"ep":get_ep()}, np.array([10.0]+[0.0]*9)[:,None]), + ((get_group(), get_feature(), 10), {"minmax":(0, 0.9)}, np.array([10.0]+[0.0]*9)[:,None]), + ((get_group(), get_feature(), 20), {"minmax":(0, 1.9)}, np.array([10.0]+[0.0]*9+[np.nan]*10)[:,None]) +]) +def test_compute_1d_tuning_curves(args, kwargs, expected): + tc = nap.compute_1d_tuning_curves(*args, **kwargs) + # Columns + assert list(tc.columns) == list(args[0].keys()) + + # Index + assert len(tc) == args[2] + if "minmax" in kwargs: + tmp = np.linspace(kwargs["minmax"][0], kwargs["minmax"][1], args[2] + 1) + else: + tmp = np.linspace(np.min(args[1]), np.max(args[1]), args[2] + 1) + np.testing.assert_almost_equal(tmp[0:-1] + np.diff(tmp)/2, tc.index.values) + + # Array + np.testing.assert_almost_equal(tc.values, expected) + +@pytest.mark.filterwarnings("ignore") +@pytest.mark.parametrize("args, kwargs, expected", [ + ((get_group(), get_features(), 10), {}, np.ones((10,10))*0.5), + ((get_group(), get_features(), (10,10)), {}, np.ones((10,10))*0.5), + ((get_group(), get_features(), 10), {"ep":nap.IntervalSet(0, 400)}, np.ones((10,10))*0.25), + ((get_group(), get_features(), 10), {"minmax":(0, 100, 0, 100)}, np.ones((10,10))*0.5), + ((get_group(), get_features(), 10), {"minmax":(0, 200, 0, 100)}, np.vstack((np.ones((5,10))*0.5,np.ones((5,10))*np.nan))), +]) +def test_compute_2d_tuning_curves(args, kwargs, expected): + tc, xy = nap.compute_2d_tuning_curves(*args, **kwargs) assert isinstance(tc, dict) - assert list(tc.keys()) == list(tsgroup.keys()) - for i in tc.keys(): - assert tc[i].shape == (10, 10) - np.testing.assert_array_almost_equal(tc[0][:, 1:], np.zeros((10, 9))) - assert tc[1][5, 0] >= 1.0 - assert isinstance(xy, list) - assert len(xy) == 2 - for i in range(2): - assert np.min(xy) > 0 - assert np.max(xy) < 100 -def test_compute_2d_tuning_curves_error(): - tmp = np.vstack( - (np.repeat(np.arange(0, 100), 10), np.tile(np.arange(0, 100), 10)) - ).T - features = nap.TsdFrame(t=np.arange(0, 200, 0.1), d=np.vstack((tmp, tmp[::-1]))) - with pytest.raises(AssertionError) as e_info: - nap.compute_2d_tuning_curves([1,2,3], features, 10) - assert str(e_info.value) == "group should be a TsGroup." + # Keys + assert list(tc.keys()) == list(args[0].keys()) - tsgroup = nap.TsGroup( - {0: nap.Ts(t=np.arange(0, 100, 10)), 1: nap.Ts(t=np.array([50, 149]))} - ) - features = nap.TsdFrame(t=np.arange(100), d=np.random.rand(100, 3)) - with pytest.raises(AssertionError) as e_info: - nap.compute_2d_tuning_curves(tsgroup, features, 10) - assert str(e_info.value) == "features should have 2 columns only." - -def test_compute_2d_tuning_curves_with_ep(): - tsgroup = nap.TsGroup( - {0: nap.Ts(t=np.arange(0, 100, 10)), 1: nap.Ts(t=np.array([50, 149]))} - ) - tmp = np.vstack( - (np.repeat(np.arange(0, 100), 10), np.tile(np.arange(0, 100), 10)) - ).T - features = nap.TsdFrame( - t=np.arange(0, 300, 0.1), d=np.vstack((tmp, tmp[::-1], tmp)) - ) - ep = nap.IntervalSet(start=0, end=200) - tc, xy = nap.compute_2d_tuning_curves(tsgroup, features, 10, ep=ep) + # Index + assert isinstance(xy, list) + assert len(xy) == 2 + nb_bins = args[2] + if isinstance(args[2], int): + nb_bins = (args[2], args[2]) + if "minmax" in kwargs: + tmp1 = np.linspace(kwargs["minmax"][0], kwargs["minmax"][1], nb_bins[0] + 1) + tmp2 = np.linspace(kwargs["minmax"][2], kwargs["minmax"][3], nb_bins[1] + 1) + else: + tmp1 = np.linspace(np.min(args[1][:,0]), np.max(args[1][:,0]), nb_bins[0] + 1) + tmp2 = np.linspace(np.min(args[1][:,1]), np.max(args[1][:,1]), nb_bins[1] + 1) + + np.testing.assert_almost_equal(tmp1[0:-1] + np.diff(tmp1)/2, xy[0]) + np.testing.assert_almost_equal(tmp2[0:-1] + np.diff(tmp2)/2, xy[1]) + + # Values for i in tc.keys(): - assert tc[i].shape == (10, 10) - np.testing.assert_array_almost_equal(tc[0][:, 1:], np.zeros((10, 9))) - assert tc[1][5, 0] >= 1.0 + assert tc[i].shape == nb_bins + np.testing.assert_almost_equal(tc[i], expected) @pytest.mark.filterwarnings("ignore") -def test_compute_2d_tuning_curves_with_minmax(): - tsgroup = nap.TsGroup( - {0: nap.Ts(t=np.arange(0, 100, 10)), 1: nap.Ts(t=np.array([50, 149]))} - ) - tmp = np.vstack( - (np.repeat(np.arange(0, 100), 10), np.tile(np.arange(0, 100), 10)) - ).T - features = nap.TsdFrame(t=np.arange(0, 200, 0.1), d=np.vstack((tmp, tmp[::-1]))) - minmax = (20, 40, 0, 10) - tc, xy = nap.compute_2d_tuning_curves(tsgroup, features, 10, minmax=minmax) - - assert len(xy) == 2 - xbins = np.linspace(minmax[0], minmax[1], 11) - np.testing.assert_array_almost_equal(xy[0], xbins[0:-1] + np.diff(xbins) / 2) - ybins = np.linspace(minmax[2], minmax[3], 11) - np.testing.assert_array_almost_equal(xy[1], ybins[0:-1] + np.diff(ybins) / 2) - - -def test_compute_1d_mutual_info(): - tc = pd.DataFrame(index=np.arange(0, 2), data=np.array([0, 10])) - feature = nap.Tsd(t=np.arange(100), d=np.tile(np.arange(2), 50)) - si = nap.compute_1d_mutual_info(tc, feature) +@pytest.mark.parametrize("args, kwargs, expected", [ + ((pd.DataFrame(index=np.arange(0, 2), data=np.array([0, 10])), + nap.Tsd(t=np.arange(100), d=np.tile(np.arange(2), 50)),), + {}, np.array([[1.]])), + ((pd.DataFrame(index=np.arange(0, 2), data=np.array([0, 10])), + nap.Tsd(t=np.arange(100), d=np.tile(np.arange(2), 50)),), + {"bitssec":True}, np.array([[5.]])), + ((pd.DataFrame(index=np.arange(0, 2), data=np.array([0, 10])), + nap.Tsd(t=np.arange(100), d=np.tile(np.arange(2), 50)),), + {"ep": nap.IntervalSet(start=0, end=49)}, np.array([[1.]])), + ((pd.DataFrame(index=np.arange(0, 2), data=np.array([0, 10])), + nap.Tsd(t=np.arange(100), d=np.tile(np.arange(2), 50)),), + {"minmax": (0, 1)}, np.array([[1.]])), + ((np.array([[0],[10]]), + nap.Tsd(t=np.arange(100), d=np.tile(np.arange(2), 50)),), + {"minmax": (0, 1)}, np.array([[1.]])), +]) +def test_compute_1d_mutual_info(args, kwargs, expected): + tc = args[0] + feature = args[1] + si = nap.compute_1d_mutual_info(tc, feature, **kwargs) assert isinstance(si, pd.DataFrame) assert list(si.columns) == ["SI"] - assert list(si.index.values) == list(tc.columns) - np.testing.assert_approx_equal(si.loc[0, "SI"], 1.0) - si = nap.compute_1d_mutual_info(tc, feature, bitssec=True) - np.testing.assert_approx_equal(si.loc[0, "SI"], 5.0) - - ep = nap.IntervalSet(start=0, end=49) - si = nap.compute_1d_mutual_info(tc, feature, ep=ep) - np.testing.assert_approx_equal(si.loc[0, "SI"], 1.0) - si = nap.compute_1d_mutual_info(tc, feature, ep=ep, bitssec=True) - np.testing.assert_approx_equal(si.loc[0, "SI"], 5.0) - - minmax = (0, 1) - si = nap.compute_1d_mutual_info(tc, feature, minmax=minmax) - np.testing.assert_approx_equal(si.loc[0, "SI"], 1.0) - -def test_compute_1d_mutual_info_array(): - tc = np.array([[0],[10]]) - feature = nap.Tsd(t=np.arange(100), d=np.tile(np.arange(2), 50)) - si = nap.compute_1d_mutual_info(tc, feature) - assert isinstance(si, pd.DataFrame) - assert list(si.columns) == ["SI"] - np.testing.assert_approx_equal(si.loc[0, "SI"], 1.0) - si = nap.compute_1d_mutual_info(tc, feature, bitssec=True) - np.testing.assert_approx_equal(si.loc[0, "SI"], 5.0) + if isinstance(tc, pd.DataFrame): + assert list(si.index.values) == list(tc.columns) + np.testing.assert_approx_equal(si.values, expected) -def test_compute_2d_mutual_info(): - tc = {0: np.array([[0, 1], [0, 0]])} - features = nap.TsdFrame( - t=np.arange(100), d=np.tile(np.array([[0, 0, 1, 1], [0, 1, 0, 1]]), 25).T - ) - si = nap.compute_2d_mutual_info(tc, features) + +@pytest.mark.filterwarnings("ignore") +@pytest.mark.parametrize("args, kwargs, expected", [ + (({0: np.array([[0, 1], [0, 0]])}, + nap.TsdFrame(t=np.arange(100), d=np.tile(np.array([[0, 0, 1, 1], [0, 1, 0, 1]]), 25).T) + ), {}, np.array([[2.0]]) ), + ((np.array([[[0, 1], [0, 0]]]), + nap.TsdFrame(t=np.arange(100), d=np.tile(np.array([[0, 0, 1, 1], [0, 1, 0, 1]]), 25).T) + ), {}, np.array([[2.0]])), + (({0: np.array([[0, 1], [0, 0]])}, + nap.TsdFrame(t=np.arange(100), d=np.tile(np.array([[0, 0, 1, 1], [0, 1, 0, 1]]), 25).T) + ), {"bitssec":True}, np.array([[0.5]])), + (({0: np.array([[0, 1], [0, 0]])}, + nap.TsdFrame(t=np.arange(100), d=np.tile(np.array([[0, 0, 1, 1], [0, 1, 0, 1]]), 25).T) + ), {"ep": nap.IntervalSet(start=0, end=7)}, np.array([[2.0]])), + (({0: np.array([[0, 1], [0, 0]])}, + nap.TsdFrame(t=np.arange(100), d=np.tile(np.array([[0, 0, 1, 1], [0, 1, 0, 1]]), 25).T) + ), {"minmax": (0,1,0,1)}, np.array([[2.0]])), +]) +def test_compute_2d_mutual_info(args, kwargs, expected): + dict_tc = args[0] + features = args[1] + si = nap.compute_2d_mutual_info(dict_tc, features, **kwargs) assert isinstance(si, pd.DataFrame) assert list(si.columns) == ["SI"] - assert list(si.index.values) == list(tc.keys()) - np.testing.assert_approx_equal(si.loc[0, "SI"], 2.0) - si = nap.compute_2d_mutual_info(tc, features, bitssec=True) - np.testing.assert_approx_equal(si.loc[0, "SI"], 0.5) - - ep = nap.IntervalSet(start=0, end=7) - si = nap.compute_2d_mutual_info(tc, features, ep=ep) - np.testing.assert_approx_equal(si.loc[0, "SI"], 2.0) - si = nap.compute_2d_mutual_info(tc, features, ep=ep, bitssec=True) - np.testing.assert_approx_equal(si.loc[0, "SI"], 0.5) - - minmax = (0, 1, 0, 1) - si = nap.compute_2d_mutual_info(tc, features, minmax=minmax) - np.testing.assert_approx_equal(si.loc[0, "SI"], 2.0) - - -@pytest.mark.parametrize( - "tsd, expected_columns", - [ - (nap.TsdFrame(t=np.arange(0, 100), d=np.ones((100, 1))), [0]), - (nap.TsdFrame(t=np.arange(0, 100), d=np.ones((100, 2))), [0, 1]), - (nap.Tsd(t=np.arange(0, 100), d=np.ones((100, ))), [0]) - ] -) -def test_compute_1d_tuning_curves_continuous(tsd, expected_columns): - feature = nap.Tsd(t=np.arange(0, 100, 0.1), d=np.arange(0, 100, 0.1) % 1.0) - tc = nap.compute_1d_tuning_curves_continuous(tsd, feature, nb_bins=10) - - assert len(tc) == 10 - assert list(tc.columns) == expected_columns - np.testing.assert_array_almost_equal(tc[0].values[1:], np.zeros(9)) - assert int(tc[0].values[0]) == 1.0 - - -@pytest.mark.parametrize( - "tsdframe, expectation", - [ - (nap.TsdFrame(t=np.arange(0, 100), d=np.ones((100, 1))), does_not_raise()), - ([1, 2, 3], pytest.raises(RuntimeError, match="Unknown format for tsdframe.")), - ( - nap.TsdTensor(t=np.arange(0, 100), d=np.ones((100, 1, 1))), - pytest.raises(RuntimeError, match="Unknown format for tsdframe.") - ), - ] -) -@pytest.mark.parametrize( - "feature", - [ - nap.Tsd(t=np.arange(0, 100, 0.1), d=np.arange(0, 100, 0.1) % 1.0), - nap.TsdFrame(t=np.arange(0, 100, 0.1), d=np.arange(0, 100, 0.1)[:, None] % 1.0) - ] -) -def test_compute_1d_tuning_curves_continuous_error_tsdframe(tsdframe, expectation, feature): - with expectation: - nap.compute_1d_tuning_curves_continuous(tsdframe, feature, nb_bins=10) - - -@pytest.mark.parametrize( - "feature, expectation", - [ - (nap.Tsd(t=np.arange(0, 100, 0.1), d=np.arange(0, 100, 0.1) % 1.0), does_not_raise()), - (nap.TsdFrame(t=np.arange(0, 100, 0.1), d=np.arange(0, 100, 0.1)[:, None] % 1.0), does_not_raise()), - ( - nap.TsdFrame(t=np.arange(0, 100, 0.1), d=np.arange(0, 200, 0.1).reshape(1000, 2) % 1.0), - pytest.raises(AssertionError, match=r"feature should be a Tsd \(or TsdFrame with 1 column only\)")) - ] -) -@pytest.mark.parametrize( - "tsd", - [ - nap.TsdFrame(t=np.arange(0, 100), d=np.ones((100, 1))), - nap.TsdFrame(t=np.arange(0, 100), d=np.ones((100, 2))), - nap.Tsd(t=np.arange(0, 100), d=np.ones((100, ))) - ] -) -def test_compute_1d_tuning_curves_continuous_error_featues(tsd, feature, expectation): - with expectation: - nap.compute_1d_tuning_curves_continuous(tsd, feature, nb_bins=10) - - -def test_compute_1d_tuning_curves_continuous_with_ep(): - tsdframe = nap.TsdFrame(t=np.arange(0, 100), d=np.ones((100, 1))) - feature = nap.Tsd(t=np.arange(0, 100, 0.1), d=np.arange(0, 100, 0.1) % 1.0) - ep = nap.IntervalSet(start=0, end=50) - tc1 = nap.compute_1d_tuning_curves_continuous(tsdframe, feature, nb_bins=10) - tc2 = nap.compute_1d_tuning_curves_continuous(tsdframe, feature, nb_bins=10, ep=ep) - pd.testing.assert_frame_equal(tc1, tc2) - - -def test_compute_1d_tuning_curves_continuous_with_min_max(): - tsdframe = nap.TsdFrame(t=np.arange(0, 100), d=np.ones((100, 1))) - feature = nap.Tsd(t=np.arange(0, 100, 0.1), d=np.arange(0, 100, 0.1) % 1.0) - tc = nap.compute_1d_tuning_curves_continuous( - tsdframe, feature, nb_bins=10, minmax=(0, 1) - ) - assert len(tc) == 10 - np.testing.assert_array_almost_equal(tc[0].values[1:], np.zeros(9)) - assert tc[0].values[0] == 1.0 - -@pytest.mark.parametrize( - "tsdframe, expected_columns", - [ - (nap.TsdFrame(t=np.arange(0, 100), d=np.hstack((np.ones((100, 1)), np.ones((100, 1)) * 2))), [0, 1]), - ( - nap.TsdFrame(t=np.arange(0, 100), d=np.hstack((np.ones((100, 1)), np.ones((100, 1)) * 2)), - columns=["x", "y"]), - ["x", "y"] - ), - (nap.Tsd(t=np.arange(0, 100), d=np.hstack((np.ones((100, )) * 2))), [0]) - - ] -) -@pytest.mark.parametrize("nb_bins", [1, 2, 3]) -def test_compute_2d_tuning_curves_continuous(nb_bins, tsdframe, expected_columns): + if isinstance(dict_tc, dict): + assert list(si.index.values) == list(dict_tc.keys()) + np.testing.assert_approx_equal(si.values, expected) + +@pytest.mark.filterwarnings("ignore") +@pytest.mark.parametrize("args, kwargs, expected", [ + ((get_tsdframe(), get_feature(), 10), {}, np.vstack((np.ones((1,2)), np.zeros((9,2))))), + ((get_tsdframe(), get_feature()[:,np.newaxis], 10), {}, np.vstack((np.ones((1,2)), np.zeros((9,2))))), + ((get_tsdframe()[:,0], get_feature(), 10), {}, np.vstack((np.ones((1,1)), np.zeros((9,1))))), + ((get_tsdframe(), get_feature(), 10), {"ep":get_ep()}, np.vstack((np.ones((1,2)), np.zeros((9,2))))), + ((get_tsdframe(), get_feature(), 10), {"minmax":(0, 0.9)}, np.vstack((np.ones((1,2)), np.zeros((9,2))))), + ((get_tsdframe(), get_feature(), 20), {"minmax":(0, 1.9)}, np.vstack((np.ones((1,2)), np.zeros((9,2)), np.ones((10,2))*np.nan))), +]) +def test_compute_1d_tuning_curves_continuous(args, kwargs, expected): + tsdframe, feature, nb_bins = args + tc = nap.compute_1d_tuning_curves_continuous(tsdframe, feature, nb_bins, **kwargs) + # Columns + if hasattr(tsdframe, "columns"): + assert list(tc.columns) == list(tsdframe.columns) + # Index + assert len(tc) == nb_bins + if "minmax" in kwargs: + tmp = np.linspace(kwargs["minmax"][0], kwargs["minmax"][1], nb_bins + 1) + else: + tmp = np.linspace(np.min(args[1]), np.max(args[1]), nb_bins + 1) + np.testing.assert_almost_equal(tmp[0:-1] + np.diff(tmp)/2, tc.index.values) + # Array + np.testing.assert_almost_equal(tc.values, expected) + + + +@pytest.mark.parametrize("tsdframe, nb_bins, kwargs, expected", [ + (nap.TsdFrame(t=np.arange(0, 100), d=np.hstack((np.ones((100, 1)), np.ones((100, 1)) * 2))), + 1, {}, {0: np.array([[1.]]), 1: np.array([[2.]])}), + (nap.TsdFrame(t=np.arange(0, 100), d=np.hstack((np.ones((100, 1)), np.ones((100, 1)) * 2)), columns=["x", "y"]), + 2, {}, {'x': np.array([[1,0],[0,0]]), 'y': np.array([[2,0],[0,0]])}), + (nap.Tsd(t=np.arange(0, 100), d=np.hstack((np.ones((100, )) * 2))), + 2, {}, {0: np.array([[2, 0], [0, 0]])}), + (nap.TsdFrame(t=np.arange(0, 100), d=np.hstack((np.ones((100, 1)), np.ones((100, 1)) * 2))), + (1,2), {}, {0: np.array([[1., 0.0]]), 1: np.array([[2., 0.0]])}), + (nap.TsdFrame(t=np.arange(0, 100), d=np.hstack((np.ones((100, 1)), np.ones((100, 1)) * 2))), + 1, {"ep":get_ep()}, {0: np.array([[1.]]), 1: np.array([[2.]])}), + (nap.TsdFrame(t=np.arange(0, 100), d=np.hstack((np.ones((100, 1)), np.ones((100, 1)) * 2))), + 1, {"minmax": (0, 1, 0, 1)}, {0: np.array([[1.]]), 1: np.array([[2.]])}), + (nap.TsdFrame(t=np.arange(0, 100), d=np.hstack((np.ones((100, 1)), np.ones((100, 1)) * 2))), + (1, 3), {"minmax": (0, 1, 0, 3)}, {0: np.array([[1., 1., np.nan]]), 1: np.array([[2., 2., np.nan]])}), +]) +def test_compute_2d_tuning_curves_continuous(tsdframe, nb_bins, kwargs, expected): features = nap.TsdFrame( t=np.arange(100), d=np.tile(np.array([[0, 0, 1, 1], [0, 1, 0, 1]]), 25).T ) - tc, xy = nap.compute_2d_tuning_curves_continuous(tsdframe, features, nb_bins) + tc, xy = nap.compute_2d_tuning_curves_continuous(tsdframe, features, nb_bins, **kwargs) - assert isinstance(tc, dict) - assert list(tc.keys()) == expected_columns - for i in tc.keys(): - assert tc[i].shape == (nb_bins, nb_bins) + # Keys + if hasattr(tsdframe, "columns"): + assert list(tc.keys()) == list(tsdframe.columns) + # Index assert isinstance(xy, list) assert len(xy) == 2 - for i in range(2): - assert np.min(xy) > 0 - assert np.max(xy) < 1 - - -def test_compute_2d_tuning_curves_continuous_output_value(): - tsdframe = nap.TsdFrame(t=np.arange(0, 100), d=np.hstack((np.ones((100, 1)), np.ones((100, 1)) * 2))) - features = nap.TsdFrame( - t=np.arange(100), d=np.tile(np.array([[0, 0, 1, 1], [0, 1, 0, 1]]), 25).T - ) - tc, xy = nap.compute_2d_tuning_curves_continuous(tsdframe, features, 2) - tmp = np.zeros((2, 2, 2)) - tmp[:, 0, 0] = [1, 2] - for i in range(2): - np.testing.assert_array_almost_equal(tc[i], tmp[i]) - - -def test_compute_2d_tuning_curves_continuous_with_ep(): - tsdframe = nap.TsdFrame( - t=np.arange(0, 100), d=np.hstack((np.ones((100, 1)), np.ones((100, 1)) * 2)) - ) - features = nap.TsdFrame( - t=np.arange(100), d=np.tile(np.array([[0, 0, 1, 1], [0, 1, 0, 1]]), 25).T - ) - ep = nap.IntervalSet(start=0, end=7) - tc1, xy = nap.compute_2d_tuning_curves_continuous(tsdframe, features, 2, ep=ep) - tc2, xy = nap.compute_2d_tuning_curves_continuous(tsdframe, features, 2) - - for i in tc1.keys(): - np.testing.assert_array_almost_equal(tc1[i], tc2[i]) - - -def test_compute_2d_tuning_curves_continuous_error_tsdframe(): - features = nap.TsdFrame( - t=np.arange(100), d=np.tile(np.array([[0, 0, 1, 1], [0, 1, 0, 1]]), 25).T - ) - with pytest.raises(RuntimeError) as e_info: - nap.compute_2d_tuning_curves_continuous([1,2,3], features, 2) - assert str(e_info.value) == "Unknown format for tsdframe." - -@pytest.mark.parametrize( - "features, expectation", - [ - ([1, 2, 3], pytest.raises(AssertionError, match="features should be a TsdFrame with 2 columns")), - (nap.TsdFrame(t=np.arange(100), d=np.tile(np.array([[0, 0, 1, 1], [0, 1, 0, 1], [0,0,0,0]]), 25).T), - pytest.raises(AssertionError, match="features should have 2 columns only")), - - ] - -) -@pytest.mark.parametrize( - "tsdframe", - [ - nap.TsdFrame(t=np.arange(0, 100), d=np.hstack((np.ones((100, 1)), np.ones((100, 1)) * 2))), - nap.Tsd(t=np.arange(0, 100), d=np.ones((100, ))) - ] -) -def test_compute_2d_tuning_curves_continuous_error_feature(tsdframe, features, expectation): - with expectation: - nap.compute_2d_tuning_curves_continuous(tsdframe, features, 2) - -@pytest.mark.filterwarnings("ignore") -def test_compute_2d_tuning_curves_with_minmax(): - tsdframe = nap.TsdFrame( - t=np.arange(0, 100), d=np.hstack((np.ones((100, 1)), np.ones((100, 1)) * 2)) - ) - features = nap.TsdFrame( - t=np.arange(100), d=np.tile(np.array([[0, 0, 1, 1], [0, 1, 0, 1]]), 25).T - ) - minmax = (0, 10, 0, 2) - tc, xy = nap.compute_2d_tuning_curves_continuous( - tsdframe, features, 2, minmax=minmax - ) + if isinstance(nb_bins, int): + nb_bins = (nb_bins, nb_bins) + if "minmax" in kwargs: + tmp1 = np.linspace(kwargs["minmax"][0], kwargs["minmax"][1], nb_bins[0] + 1) + tmp2 = np.linspace(kwargs["minmax"][2], kwargs["minmax"][3], nb_bins[1] + 1) + else: + tmp1 = np.linspace(np.min(features[:,0]), np.max(features[:,0]), nb_bins[0] + 1) + tmp2 = np.linspace(np.min(features[:,1]), np.max(features[:,1]), nb_bins[1] + 1) + + np.testing.assert_almost_equal(tmp1[0:-1] + np.diff(tmp1)/2, xy[0]) + np.testing.assert_almost_equal(tmp2[0:-1] + np.diff(tmp2)/2, xy[1]) + + # Values + for i in tc.keys(): + assert tc[i].shape == nb_bins + np.testing.assert_almost_equal(tc[i], expected[i]) - assert len(xy) == 2 - xbins = np.linspace(minmax[0], minmax[1], 3) - np.testing.assert_array_almost_equal(xy[0], xbins[0:-1] + np.diff(xbins) / 2) - ybins = np.linspace(minmax[2], minmax[3], 3) - np.testing.assert_array_almost_equal(xy[1], ybins[0:-1] + np.diff(ybins) / 2)