From b5a7009c16681b82673e7bf517283fa16b2bbb5e Mon Sep 17 00:00:00 2001 From: Nikolaos Perrakis Date: Fri, 31 May 2024 00:15:37 +0300 Subject: [PATCH 1/4] update wasserstein and linting --- nannyml/drift/univariate/calculator.py | 4 +- nannyml/drift/univariate/methods.py | 92 +++++++++++++++----------- nannyml/drift/univariate/result.py | 11 +-- 3 files changed, 62 insertions(+), 45 deletions(-) diff --git a/nannyml/drift/univariate/calculator.py b/nannyml/drift/univariate/calculator.py index 836a3b732..f072083e6 100644 --- a/nannyml/drift/univariate/calculator.py +++ b/nannyml/drift/univariate/calculator.py @@ -111,7 +111,6 @@ def __init__( chunker : Chunker The `Chunker` used to split the data sets into a lists of chunks. thresholds: dict - Defaults to:: { @@ -136,8 +135,7 @@ def __init__( The `chi2` method does not support custom thresholds for now. Additional research is required to determine how to transition from its current p-value based implementation. - computation_params : dict - + computation_params: dict Defaults to:: { diff --git a/nannyml/drift/univariate/methods.py b/nannyml/drift/univariate/methods.py index b5710318d..ded5023cc 100644 --- a/nannyml/drift/univariate/methods.py +++ b/nannyml/drift/univariate/methods.py @@ -2,7 +2,7 @@ # # License: Apache Software License 2.0 -""" This module contains the different drift detection method implementations. +"""This module contains the different drift detection method implementations. The :class:`~nannyml.drift.univariate.methods.MethodFactory` will convert the drift detection method names into an instance of the base :class:`~nannyml.drift.univariate.methods.Method` class. @@ -62,10 +62,8 @@ def __init__( computation_params : dict, default=None A dictionary specifying parameter names and values to be used in the computation of the drift method. - upper_threshold : float, default=None - An optional upper threshold for the data quality metric. - lower_threshold : float, default=None - An optional lower threshold for the data quality metric. + threshold : Threshold + Threshold class defining threshold strategy. upper_threshold_limit : float, default=None An optional upper threshold limit for the data quality metric. lower_threshold_limit : float, default=0 @@ -257,6 +255,7 @@ class JensenShannonDistance(Method): """ def __init__(self, **kwargs) -> None: + """Initialize Jensen-Shannon method.""" super().__init__( display_name='Jensen-Shannon distance', column_name='jensen_shannon', @@ -339,6 +338,7 @@ class KolmogorovSmirnovStatistic(Method): """ def __init__(self, **kwargs) -> None: + """Initialize Kolmogorov-Smirnov method.""" super().__init__( display_name='Kolmogorov-Smirnov statistic', column_name='kolmogorov_smirnov', @@ -405,7 +405,7 @@ def _calculate(self, data: pd.Series): chunk_rel_freqs = chunk_proba_in_qts / len(data) rel_freq_lower_than_edges = len(data[data < self._qts[0]]) / len(data) chunk_rel_freqs = rel_freq_lower_than_edges + np.cumsum(chunk_rel_freqs) - stat = np.max(abs(self._ref_rel_freqs - chunk_rel_freqs)) + stat = np.max(abs(self._ref_rel_freqs - chunk_rel_freqs)) # type: ignore else: stat, _ = ks_2samp(self._reference_data, data) @@ -420,6 +420,7 @@ class Chi2Statistic(Method): """ def __init__(self, **kwargs) -> None: + """Initialize Chi2-contingency method.""" super().__init__( display_name='Chi2 statistic', column_name='chi2', @@ -444,6 +445,16 @@ def __init__(self, **kwargs) -> None: self._fitted = False def fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None) -> Self: + """Fits Chi2 Method on reference data. + + Parameters + ---------- + reference_data: pd.DataFrame + The reference data used for fitting a Method. Must have target data available. + timestamps: Optional[pd.Series], default=None + A series containing the reference data Timestamps + + """ super().fit(reference_data, timestamps) # Thresholding is based on p-values. Ignoring all custom thresholding and disable plotting a threshold @@ -470,6 +481,16 @@ def _calculate(self, data: pd.Series): return stat def alert(self, value: float): + """Evaluates if an alert has occurred for Chi2 on the current chunk data. + + For Chi2 alerts are based on p-values rather than the actual method values like + in all other Univariate drift methods. + + Parameters + ---------- + value: float + The method value for a given chunk + """ return self._p_value < 0.05 def _calc_chi2(self, data: pd.Series): @@ -491,6 +512,7 @@ class LInfinityDistance(Method): """ def __init__(self, **kwargs) -> None: + """Initialize L-Infinity Distance method.""" super().__init__( display_name='L-Infinity distance', column_name='l_infinity', @@ -537,6 +559,7 @@ class WassersteinDistance(Method): """ def __init__(self, **kwargs) -> None: + """Initialize Wasserstein Distance method.""" super().__init__( display_name='Wasserstein distance', column_name='wasserstein', @@ -579,6 +602,9 @@ def _fit(self, reference_data: pd.Series, timestamps: Optional[pd.Series] = None reference_proba_in_bins, self._bin_edges = np.histogram(reference_data, bins=self.n_bins) self._ref_rel_freqs = reference_proba_in_bins / len(reference_data) self._bin_width = self._bin_edges[1] - self._bin_edges[0] + self._ref_min = self._bin_edges[0] + self._ref_max = self._bin_edges[-1] + self._ref_cdf = np.cumsum(self._ref_rel_freqs) self._fitted = True self._reference_size = len(reference_data) @@ -596,42 +622,31 @@ def _calculate(self, data: pd.Series): if ( self.calculation_method == 'auto' and self._reference_size >= 10_000 ) or self.calculation_method == 'estimated': - min_chunk = np.min(data) - - if min_chunk < self._bin_edges[0]: - extra_bins_left = (min_chunk - self._bin_edges[0]) / self._bin_width - extra_bins_left = np.ceil(extra_bins_left) + data_histogram, _ = np.histogram(data, bins=self._bin_edges) + data_histogram = data_histogram / len(data) + data_smaller = data[data < self._ref_min] + data_bigger = data[data > self._ref_max] + sample_size = len(data) + del data + + if len(data_smaller) > 0: + amount_smaller = len(data_smaller) / sample_size + term_smaller = wasserstein_distance(data_smaller, np.full(len(data_smaller), self._ref_min)) + term_smaller = term_smaller * amount_smaller else: - extra_bins_left = 0 + term_smaller, amount_smaller = 0, 0 - max_chunk = np.max(data) - - if max_chunk > self._bin_edges[-1]: - extra_bins_right = (max_chunk - self._bin_edges[-1]) / self._bin_width - extra_bins_right = np.ceil(extra_bins_right) + if len(data_bigger) > 0: + amount_bigger = len(data_bigger) / sample_size + term_bigger = wasserstein_distance(data_bigger, np.full(len(data_bigger), self._ref_max)) + term_bigger = term_bigger * amount_bigger else: - extra_bins_right = 0 - - left_edges_to_prepand = np.arange( - min_chunk - self._bin_width, self._bin_edges[0] - self._bin_width, self._bin_width - ) - right_edges_to_append = np.arange( - self._bin_edges[-1] + self._bin_width, max_chunk + self._bin_width, self._bin_width - ) - - updated_edges = np.concatenate([left_edges_to_prepand, self._bin_edges, right_edges_to_append]) - updated_ref_binned_pdf = np.concatenate( - [np.zeros(len(left_edges_to_prepand)), self._ref_rel_freqs, np.zeros(len(right_edges_to_append))] - ) - - chunk_histogram, _ = np.histogram(data, bins=updated_edges) - - chunk_binned_pdf = chunk_histogram / len(data) - - ref_binned_cdf = np.cumsum(updated_ref_binned_pdf) - chunk_binned_cdf = np.cumsum(chunk_binned_pdf) + term_bigger, amount_bigger = 0, 0 - distance = np.sum(np.abs(ref_binned_cdf - chunk_binned_cdf) * self._bin_width) + data_cdf = np.cumsum(data_histogram) + data_cdf = data_cdf + amount_smaller # if there's some data on the left-hand side + term_within = np.sum(np.abs(self._ref_cdf - data_cdf) * self._bin_width) + distance = term_within + term_smaller + term_bigger else: distance = wasserstein_distance(self._reference_data, data) @@ -644,6 +659,7 @@ class HellingerDistance(Method): """Calculates the Hellinger Distance between two distributions.""" def __init__(self, **kwargs) -> None: + """Initialize Hellinger Distance method.""" super().__init__( display_name='Hellinger distance', column_name='hellinger', diff --git a/nannyml/drift/univariate/result.py b/nannyml/drift/univariate/result.py index c1d31df1f..7cce49fa5 100644 --- a/nannyml/drift/univariate/result.py +++ b/nannyml/drift/univariate/result.py @@ -44,7 +44,8 @@ def __init__( analysis_data: pd.DataFrame = None, reference_data: pd.DataFrame = None, ): - """ + """Initialize resuts class. + Parameters ---------- results_data: pd.DataFrame @@ -112,6 +113,7 @@ def __init__( @property def methods(self) -> List[Method]: + """Methods used during calculation.""" return cast(List[Method], self.metrics) def _filter( @@ -167,9 +169,9 @@ def _get_result_property(self, property_name: str) -> List[pd.Series]: return continuous_values + categorical_values def keys(self) -> List[Key]: - """ - Creates a list of keys for continuos and categorial columns where each Key is a `namedtuple('Key', - 'properties display_names')` + """Creates a list of keys for continuos and categorial columns. + + Each Key is a `namedtuple('Key', 'properties display_names')` """ continuous_keys = [ Key(properties=(column, method.column_name), display_names=(column, method.display_name)) @@ -204,6 +206,7 @@ def plot( - 'distribution' plots feature distribution per :class:`~nannyml.chunk.Chunk`. Joyplot for continuous features, stacked bar charts for categorical features. + Returns ------- fig: :class:`plotly.graph_objs._figure.Figure` From f362f5c6ce2bfbf7cd4a15d81db351abf3c640f7 Mon Sep 17 00:00:00 2001 From: Nikolaos Perrakis Date: Fri, 31 May 2024 13:03:53 +0300 Subject: [PATCH 2/4] wassserstein update --- nannyml/drift/univariate/methods.py | 35 ++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/nannyml/drift/univariate/methods.py b/nannyml/drift/univariate/methods.py index ded5023cc..04a64bcf3 100644 --- a/nannyml/drift/univariate/methods.py +++ b/nannyml/drift/univariate/methods.py @@ -622,36 +622,49 @@ def _calculate(self, data: pd.Series): if ( self.calculation_method == 'auto' and self._reference_size >= 10_000 ) or self.calculation_method == 'estimated': - data_histogram, _ = np.histogram(data, bins=self._bin_edges) - data_histogram = data_histogram / len(data) data_smaller = data[data < self._ref_min] data_bigger = data[data > self._ref_max] - sample_size = len(data) - del data - - if len(data_smaller) > 0: - amount_smaller = len(data_smaller) / sample_size - term_smaller = wasserstein_distance(data_smaller, np.full(len(data_smaller), self._ref_min)) + n_smaller = len(data_smaller) + n_bigger = len(data_bigger) + + if n_smaller > 0: + amount_smaller = (n_smaller + 1) / len(data) + smaller_with_first_ref_value = np.concatenate((data_smaller, [self._ref_min])) + x, y = self._ecdf(smaller_with_first_ref_value) + term_smaller = np.sum((y)[:-1] * np.diff(x)) term_smaller = term_smaller * amount_smaller else: term_smaller, amount_smaller = 0, 0 - if len(data_bigger) > 0: - amount_bigger = len(data_bigger) / sample_size - term_bigger = wasserstein_distance(data_bigger, np.full(len(data_bigger), self._ref_max)) + if n_bigger > 0: + amount_bigger = (n_bigger + 1) / len(data) + bigger_with_last_ref_value = np.concatenate(([self._ref_max], data_bigger)) + x, y = self._ecdf(bigger_with_last_ref_value) + term_bigger = np.sum((1 - y)[: -1] * np.diff(x)) term_bigger = term_bigger * amount_bigger else: term_bigger, amount_bigger = 0, 0 + data_histogram, _ = np.histogram(data, bins=self._bin_edges) + data_histogram = data_histogram / len(data) + data_cdf = np.cumsum(data_histogram) data_cdf = data_cdf + amount_smaller # if there's some data on the left-hand side term_within = np.sum(np.abs(self._ref_cdf - data_cdf) * self._bin_width) + distance = term_within + term_smaller + term_bigger else: distance = wasserstein_distance(self._reference_data, data) return distance + def _ecdf(self, vec: np.ndarray): + """Custom implementation to calculate ECDF.""" + vec = np.sort(vec) + x, counts = np.unique(vec, return_counts=True) + cdf = np.cumsum(counts) / len(vec) + return x, cdf + @MethodFactory.register(key='hellinger', feature_type=FeatureType.CONTINUOUS) @MethodFactory.register(key='hellinger', feature_type=FeatureType.CATEGORICAL) From 7fb04a83caa225a641eb1470a840486b6b9d2ebf Mon Sep 17 00:00:00 2001 From: Nikolaos Perrakis Date: Fri, 31 May 2024 13:17:24 +0300 Subject: [PATCH 3/4] add extra was out of ref support test --- tests/drift/test_univariate_drift_methods.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/drift/test_univariate_drift_methods.py b/tests/drift/test_univariate_drift_methods.py index 7a7978510..50d742c88 100644 --- a/tests/drift/test_univariate_drift_methods.py +++ b/tests/drift/test_univariate_drift_methods.py @@ -118,6 +118,16 @@ def test_wasserstein_both_continuous_analysis_with_neg_mean_medium_drift(): # n assert wass_dist == 3.99 +def test_wasserstein_both_continuous_analysis_estimate_with_out_of_reference_drift(): # noqa: D103 + np.random.seed(1) + reference = pd.Series(np.random.normal(0, 1, 15_000), name='A') + analysis = pd.Series(np.random.normal(0, 10, 1_000_000), name='A') + wass_dist = WassersteinDistance(chunker=chunker, threshold=threshold) + wass_dist = wass_dist.fit(reference).calculate(analysis) + wass_dist = np.round(wass_dist, 3) + assert wass_dist == 7.180 + + # ************* Hellinger Tests ************* From 878dc6091e59f93b3c025dc08c87d06ca0982ed9 Mon Sep 17 00:00:00 2001 From: Niels Nuyttens Date: Thu, 6 Jun 2024 15:31:55 +0200 Subject: [PATCH 4/4] Declare attributes in init --- nannyml/drift/univariate/methods.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/nannyml/drift/univariate/methods.py b/nannyml/drift/univariate/methods.py index 04a64bcf3..a635c8318 100644 --- a/nannyml/drift/univariate/methods.py +++ b/nannyml/drift/univariate/methods.py @@ -582,6 +582,9 @@ def __init__(self, **kwargs) -> None: self._bin_width: float self._bin_edges: np.ndarray self._ref_rel_freqs: Optional[np.ndarray] = None + self._ref_min: float + self._ref_max: float + self._ref_cdf: np.ndarray self._fitted = False if ( (not kwargs) @@ -640,7 +643,7 @@ def _calculate(self, data: pd.Series): amount_bigger = (n_bigger + 1) / len(data) bigger_with_last_ref_value = np.concatenate(([self._ref_max], data_bigger)) x, y = self._ecdf(bigger_with_last_ref_value) - term_bigger = np.sum((1 - y)[: -1] * np.diff(x)) + term_bigger = np.sum((1 - y)[:-1] * np.diff(x)) term_bigger = term_bigger * amount_bigger else: term_bigger, amount_bigger = 0, 0