From 0e87b804efc7f55d75402bf5cfdea87e66cd9232 Mon Sep 17 00:00:00 2001 From: Hoang Anh Ngo <50743576+hoanganhngo610@users.noreply.github.com> Date: Tue, 31 Oct 2023 00:05:57 +0700 Subject: [PATCH] Implementation of the incremental Kolmogorov-Smirnov Statistics (#1354) * Initial implementation of incremental KS statistics * Update docstring * Refactor incremental KS from metrics to stats. * Modify import in test_ks.py file * Refactor import in test_ks.py file * Refactor import for test_ks.py * Add test to assert whether the two sliding windows follow the same distribution with the result from the KS test. * Run black code formatter on kolmogorov_smirnov.py file * Fix test_ks.py with ruff v0.1.2 * Commit from suggestion of code reviewer. Co-authored-by: Max Halford * Update code from reviewer's suggestion. Co-authored-by: Max Halford * Update code from reviewer's suggestion. Co-authored-by: Max Halford * Trim trailing whitespace of kolmogorov_smirnov.py file * Add description to UNRELEASED.md --------- Co-authored-by: Max Halford --- docs/releases/unreleased.md | 4 + river/stats/__init__.py | 2 + river/stats/kolmogorov_smirnov.py | 291 ++++++++++++++++++++++++++++++ river/stats/test_ks.py | 45 +++++ 4 files changed, 342 insertions(+) create mode 100644 river/stats/kolmogorov_smirnov.py create mode 100644 river/stats/test_ks.py diff --git a/docs/releases/unreleased.md b/docs/releases/unreleased.md index 1dd67a69b6..4973eb8e35 100644 --- a/docs/releases/unreleased.md +++ b/docs/releases/unreleased.md @@ -47,6 +47,10 @@ River's mini-batch methods now support pandas v2. In particular, River conforms - Fix a bug in `tree.splitter.NominalSplitterClassif` that generated a mismatch between the number of existing tree branches and the number of tracked branches. - Fix a bug in `tree.ExtremelyFastDecisionTreeClassifier` where the split re-evaluation failed when the current branch's feature was not available as a split option. The fix also enables the tree to pre-prune a leaf via the tie-breaking mechanism. +## stats + +- Implementation of the incremental Kolmogorov-Smirnov statistics (at `stats.KolmogorovSmirnov`), with the option to calculate either the original KS or Kuiper's test. + ## utils - Removed `utils.dict2numpy` and `utils.numpy2dict` functions. They were not used anywhere in the library. diff --git a/river/stats/__init__.py b/river/stats/__init__.py index 50c0625ef5..0b88c19d4b 100644 --- a/river/stats/__init__.py +++ b/river/stats/__init__.py @@ -9,6 +9,7 @@ from .ewmean import EWMean from .ewvar import EWVar from .iqr import IQR, RollingIQR +from .kolmogorov_smirnov import KolmogorovSmirnov from .kurtosis import Kurtosis from .link import Link from .mad import MAD @@ -37,6 +38,7 @@ "EWMean", "EWVar", "IQR", + "KolmogorovSmirnov", "Kurtosis", "Link", "MAD", diff --git a/river/stats/kolmogorov_smirnov.py b/river/stats/kolmogorov_smirnov.py new file mode 100644 index 0000000000..873e3dd731 --- /dev/null +++ b/river/stats/kolmogorov_smirnov.py @@ -0,0 +1,291 @@ +from __future__ import annotations + +import math +import random + +from river import base, stats + +__all__ = ["KolmogorovSmirnov"] + + +class Treap(base.Base): + """Class representing Treap (Cartesian Tree) used to calculate the Incremental KS statistics.""" + + def __init__(self, key, value=0): + self.key = key + self.value = value + self.priority = random.random() + self.size = 1 + self.height = 1 + self.lazy = 0 + self.max_value = value + self.min_value = value + self.left = None + self.right = None + + @staticmethod + def sum_all(node, value): + if node is None: + return + node.value += value + node.max_value += value + node.min_value += value + node.lazy += value + + @classmethod + def unlazy(cls, node): + cls.sum_all(node.left, node.lazy) + cls.sum_all(node.right, node.lazy) + node.lazy = 0 + + @classmethod + def update(cls, node): + if node is None: + return + cls.unlazy(node) + node.size = 1 + node.height = 0 + node.max_value = node.value + node.min_value = node.value + + if node.left is not None: + node.size += node.left.size + node.height += node.left.height + node.max_value = max(node.max_value, node.left.max_value) + node.min_value = min(node.min_value, node.left.min_value) + + if node.right is not None: + node.size += node.right.size + node.height = max(node.height, node.right.height) + node.max_value = max(node.max_value, node.right.max_value) + node.min_value = min(node.min_value, node.right.min_value) + + node.height += 1 + + @classmethod + def split_keep_right(cls, node, key): + if node is None: + return None, None + + left, right = None, None + + cls.unlazy(node) + + if key <= node.key: + left, node.left = cls.split_keep_right(node.left, key) + right = node + else: + node.right, right = cls.split_keep_right(node.right, key) + left = node + + cls.update(left) + cls.update(right) + + return left, right + + @classmethod + def merge(cls, left, right): + if left is None: + return right + if right is None: + return left + + node = None + + if left.priority > right.priority: + cls.unlazy(left) + left.right = cls.merge(left.right, right) + node = left + else: + cls.unlazy(right) + right.left = cls.merge(left, right.left) + node = right + + cls.update(node) + + return node + + @classmethod + def split_smallest(cls, node): + if node is None: + return None, None + + left, right = None, None + + cls.unlazy(node) + + if node.left is not None: + left, node.left = cls.split_smallest(node.left) + right = node + else: + right = node.right + node.right = None + left = node + + cls.update(left) + cls.update(right) + + return left, right + + @classmethod + def split_greatest(cls, node): + if node is None: + return None, None + + cls.unlazy(node) + + if node.right is not None: + node.right, right = cls.split_greatest(node.right) + left = node + else: + left = node.left + node.left = None + right = node + + cls.update(left) + cls.update(right) + + return left, right + + @staticmethod + def get_size(node): + return 0 if node is None else node.size + + @staticmethod + def get_height(node): + return 0 if node is None else node.height + + +class KolmogorovSmirnov(stats.base.Bivariate): + r"""Incremental Kolmogorov-Smirnov statistics. + + The two-sample Kolmogorov-Smirnov test quantifies the distance between the empirical functions of two samples, + with the null distribution of this statistic is calculated under the null hypothesis that the samples are drawn from + the same distribution. The formula can be described as + + $$ + D_{n, m} = \sup_x \| F_{1, n}(x) - F_{2, m}(x) \|. + $$ + + This implementation is the incremental version of the previously mentioned statistics, with the change being in + the ability to insert and remove an observation thorugh time. This can be done using a randomized tree called + Treap (or Cartesian Tree) [^2] with bulk operation and lazy propagation. + + The implemented algorithm is able to perform the insertion and removal operations + in O(logN) with high probability and calculate the Kolmogorov-Smirnov test in O(1), + where N is the number of sample observations. This is a significant improvement compared + to the O(N logN) cost of non-incremental implementation. + + This implementation also supports the calculation of the Kuiper statistics. Different from the orginial + Kolmogorov-Smirnov statistics, Kuiper's test [^3] calculates the sum of the absolute sizes of the most positive and + most negative differences between the two cumulative distribution functions taken into account. As such, + Kuiper's test is very sensitive in the tails as at the median. + + Last but not least, this implementation is also based on the original implementation within the supplementary + material of the authors of paper [^1], at + [the following Github repository](https://github.com/denismr/incremental-ks/tree/master). + + Parameters + ---------- + statistic + The method used to calculate the statistic, can be either "ks" or "kuiper". + The default value is set as "ks". + + Examples + -------- + + >>> import numpy as np + >>> from river import stats + + >>> stream_a = [1, 1, 2, 2, 3, 3, 4, 4] + >>> stream_b = [1, 1, 1, 1, 2, 2, 2, 2] + + >>> incremental_ks = stats.KolmogorovSmirnov(statistic="ks") + >>> for a, b in zip(stream_a, stream_b): + ... incremental_ks.update(a, b) + + >>> incremental_ks + KolmogorovSmirnov: 0.5 + + >>> incremental_ks.n_samples + 8 + + References + ---------- + [^1]: dos Reis, D.M. et al. (2016) ‘Fast unsupervised online drift detection using incremental Kolmogorov-Smirnov + test’, Proceedings of the 22nd ACM SIGKDD International Conference on Knowledge Discovery and Data Mining. + doi:10.1145/2939672.2939836. + [^2]: C. R. Aragon and R. G. Seidel. Randomized search trees. In FOCS, pages 540–545. IEEE, 1989. + [^3]: Kuiper, N. H. (1960). "Tests concerning random points on a circle". + Proceedings of the Koninklijke Nederlandse Akademie van Wetenschappen, Series A. 63: 38–47. + + """ + + def __init__(self, statistic="ks"): + self.treap = None + self.n_samples = 0 + self.statistic = statistic + + @staticmethod + def ca(p_value): + return (-0.5 * math.log(p_value)) ** 0.5 + + @classmethod + def ks_threshold(cls, p_value, n_samples): + return cls.ca(p_value) * (2.0 * n_samples / n_samples**2) + + def update(self, x, y): + keys = ((x, 0), (y, 1)) + + self.n_samples += 1 + + for key in keys: + left, left_g, right, val = None, None, None, None + + left, right = Treap.split_keep_right(self.treap, key) + left, left_g = Treap.split_greatest(left) + val = 0 if left_g is None else left_g.value + + left = Treap.merge(left, left_g) + right = Treap.merge(Treap(key, val), right) + Treap.sum_all(right, 1 if key[1] == 0 else -1) + + self.treap = Treap.merge(left, right) + + def revert(self, x, y): + keys = ((x, 0), (y, 1)) + + self.n_samples -= 1 + + for key in keys: + left, right, right_l = None, None, None + + left, right = Treap.split_keep_right(self.treap, key) + right_l, right = Treap.split_smallest(right) + + if right_l is not None and right_l.key == key: + Treap.sum_all(right, -1 if key[1] == 0 else 1) + else: + right = Treap.merge(right_l, right) + + self.treap = Treap.merge(left, right) + + def get(self): + assert self.statistic in ["ks", "kuiper"] + if self.n_samples == 0: + return 0 + + if self.statistic == "ks": + return max(self.treap.max_value, -self.treap.min_value) / self.n_samples + elif self.statistic == "kuiper": + return max(self.treap.max_value - self.treap.min_value) / self.n_samples + else: + raise ValueError(f"Unknown statistic {self.statistic}, expected one of: ks, kuiper") + + def test_ks_threshold(self, ca): + """ + Test whether the reference and sliding window follows the same or different probability distribution. + This test will return `True` if we **reject** the null hypothesis that + the two windows follow the same distribution. + """ + return self.get() > ca * (2 * self.n_samples / self.n_samples**2) ** 0.5 diff --git a/river/stats/test_ks.py b/river/stats/test_ks.py new file mode 100644 index 0000000000..baa979189e --- /dev/null +++ b/river/stats/test_ks.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +from collections import deque + +import numpy as np +from scipy.stats import ks_2samp + +from river import stats + + +def test_incremental_ks_statistics(): + initial_a = np.random.normal(loc=0, scale=1, size=500) + initial_b = np.random.normal(loc=1, scale=1, size=500) + + stream_a = np.random.normal(loc=0, scale=1, size=5000) + stream_b = np.random.normal(loc=1, scale=1, size=5000) + + incremental_ks_statistics = [] + incremental_ks = stats.KolmogorovSmirnov(statistic="ks") + sliding_a = deque(initial_a) + sliding_b = deque(initial_b) + + for a, b in zip(initial_a, initial_b): + incremental_ks.update(a, b) + for a, b in zip(stream_a, stream_b): + incremental_ks.revert(sliding_a.popleft(), sliding_b.popleft()) + sliding_a.append(a) + sliding_b.append(b) + incremental_ks.update(a, b) + incremental_ks_statistics.append(incremental_ks.get()) + + ks_2samp_statistics = [] + sliding_a = deque(initial_a) + sliding_b = deque(initial_b) + + for a, b in zip(stream_a, stream_b): + sliding_a.popleft() + sliding_b.popleft() + sliding_a.append(a) + sliding_b.append(b) + ks_2samp_statistics.append(ks_2samp(sliding_a, sliding_b).statistic) + + assert np.allclose(np.array(incremental_ks_statistics), np.array(ks_2samp_statistics)) + + assert incremental_ks.test_ks_threshold(ca=incremental_ks.ca(p_value=0.05)) is True