diff --git a/docs/source/reference/tensor/statistics.rst b/docs/source/reference/tensor/statistics.rst index 2848ab8bad..adc92e0331 100644 --- a/docs/source/reference/tensor/statistics.rst +++ b/docs/source/reference/tensor/statistics.rst @@ -62,6 +62,7 @@ Statistical tests :nosignatures: mars.tensor.stats.chisquare + mars.tensor.stats.ks_2samp mars.tensor.stats.power_divergence mars.tensor.stats.ttest_1samp mars.tensor.stats.ttest_ind diff --git a/mars/_utils.pyx b/mars/_utils.pyx index c7f9dc42d8..a63a6b0cfc 100644 --- a/mars/_utils.pyx +++ b/mars/_utils.pyx @@ -14,8 +14,8 @@ import importlib import os -import pkgutil import pickle +import pkgutil import types import uuid from collections import deque @@ -27,23 +27,21 @@ import numpy as np import pandas as pd import cloudpickle cimport cython - -from .lib.mmh3 import hash as mmh_hash, hash_bytes as mmh_hash_bytes, \ - hash_from_buffer as mmh3_hash_from_buffer - try: from pandas.tseries.offsets import Tick as PDTick except ImportError: PDTick = None - try: from sqlalchemy.sql import Selectable as SASelectable from sqlalchemy.sql.sqltypes import TypeEngine as SATypeEngine except ImportError: SASelectable, SATypeEngine = None, None -cdef bint _has_cupy = pkgutil.find_loader('cupy') -cdef bint _has_cudf = pkgutil.find_loader('cudf') +from .lib.mmh3 import hash as mmh_hash, hash_bytes as mmh_hash_bytes, \ + hash_from_buffer as mmh3_hash_from_buffer + +cdef bint _has_cupy = bool(pkgutil.find_loader('cupy')) +cdef bint _has_cudf = bool(pkgutil.find_loader('cudf')) cpdef str to_str(s, encoding='utf-8'): diff --git a/mars/config.yml b/mars/config.yml new file mode 100644 index 0000000000..30366b5fad --- /dev/null +++ b/mars/config.yml @@ -0,0 +1 @@ +"@inherits": "@mars/deploy/oscar/base_config.yml" diff --git a/mars/core/entity/tests/__init__.py b/mars/core/entity/tests/__init__.py new file mode 100644 index 0000000000..c71e83c08e --- /dev/null +++ b/mars/core/entity/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/mars/core/entity/tests/test_utils.py b/mars/core/entity/tests/test_utils.py new file mode 100644 index 0000000000..674310e735 --- /dev/null +++ b/mars/core/entity/tests/test_utils.py @@ -0,0 +1,46 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mars import tensor as mt +from mars.core import recursive_tile +from mars.tensor.operands import TensorOperand, TensorOperandMixin +from mars.utils import has_unknown_shape + + +class _TestOperand(TensorOperand, TensorOperandMixin): + + @classmethod + def tile(cls, op: "_TestOperand"): + data1, data2 = op.inputs + + data1 = mt.sort(data1) + data2 = mt.sort(data2) + data_all = mt.concatenate([data1, data2]) + s1 = mt.searchsorted(data1, data_all) + s2 = mt.searchsorted(data2, data_all) + result = yield from recursive_tile(mt.concatenate([s1, s2])) + # data1 will be yield by s1 + assert not has_unknown_shape(data1) + assert not has_unknown_shape(data2) + assert not has_unknown_shape(data_all) + return result + + +def test_recursive_tile(setup): + d1 = mt.random.rand(10, chunk_size=5) + d2 = mt.random.rand(10, chunk_size=5) + op = _TestOperand() + t = op.new_tensor([d1, d2], dtype=d1.dtype, + shape=(20,), order=d1.order) + t.execute() diff --git a/mars/core/entity/utils.py b/mars/core/entity/utils.py index 26289c40d7..16b6f0a854 100644 --- a/mars/core/entity/utils.py +++ b/mars/core/entity/utils.py @@ -64,6 +64,7 @@ def recursive_tile(tileable: TileableType, *tileables: TileableType) -> \ tileable = raw[0] tileables = raw[1:] + inputs_set = set(tileable.op.inputs) to_tile = [tileable] + list(tileables) q = [t for t in to_tile if t.is_coarse()] while q: @@ -72,7 +73,18 @@ def recursive_tile(tileable: TileableType, *tileables: TileableType) -> \ if cs: q.extend(cs) continue - yield from handler.tile(t.op.outputs) + for obj in handler.tile(t.op.outputs): + to_update_inputs = [] + chunks = [] + for inp in t.op.inputs: + if has_unknown_shape(inp): + to_update_inputs.append(inp) + if inp not in inputs_set: + chunks.extend(inp.chunks) + if obj is None: + yield chunks + to_update_inputs + else: + yield obj + to_update_inputs q.pop() if not return_list: diff --git a/mars/core/graph/builder/chunk.py b/mars/core/graph/builder/chunk.py index 29364bdaaf..8879083c26 100644 --- a/mars/core/graph/builder/chunk.py +++ b/mars/core/graph/builder/chunk.py @@ -92,17 +92,15 @@ def _tile(self, visited: Set[EntityType]): try: need_process = next(tile_handler) - if need_process is None: - chunks = [] - else: - chunks = [] + chunks = [] + if need_process is not None: for t in need_process: if isinstance(t, CHUNK_TYPE): chunks.append(self._get_data(t)) elif isinstance(t, TILEABLE_TYPE): to_update_tileables.append(self._get_data(t)) # not finished yet - self._add_nodes(chunk_graph, chunks, visited) + self._add_nodes(chunk_graph, chunks.copy(), visited) next_tileable_handlers.append((tileable, tile_handler)) # add intermediate chunks into result chunks # to prevent them being pruned diff --git a/mars/deploy/oscar/session.py b/mars/deploy/oscar/session.py index 501250a6fe..cb24aaf514 100644 --- a/mars/deploy/oscar/session.py +++ b/mars/deploy/oscar/session.py @@ -1129,9 +1129,11 @@ def __exit__(self, *_): self.progress_bar.__exit__(*_) def update(self, progress: float): + progress = min(progress, 100) last_progress = self.last_progress if self.progress_bar: - self.progress_bar.update(progress - last_progress) + incr = max(progress - last_progress, 0) + self.progress_bar.update(incr) self.last_progress = max(last_progress, progress) diff --git a/mars/opcodes.py b/mars/opcodes.py index dba9317e91..9074617e3c 100644 --- a/mars/opcodes.py +++ b/mars/opcodes.py @@ -386,13 +386,6 @@ FUSE = 801 -# control -ENTER = 901 -LEAVE = 902 -FIX_LATEST = 903 -IF_ELSE = 904 -NEXT_ITER = 905 - # table like input for tensor TABLE_COO = 1003 # store tensor as coo format diff --git a/mars/services/task/analyzer/analyzer.py b/mars/services/task/analyzer/analyzer.py index e1009b9c24..ae76c6beae 100644 --- a/mars/services/task/analyzer/analyzer.py +++ b/mars/services/task/analyzer/analyzer.py @@ -182,7 +182,8 @@ def _build_fuse_subtask_chunk_graph(self, # the last chunk result_chunks.append(copied_fuse_chunk) fuse_to_copied[fuse_chunk] = copied_fuse_chunk - self._chunk_to_copied[chunk] = fuse_to_copied[chunk.chunk] + self._chunk_to_copied[chunk.chunk] = self._chunk_to_copied[chunk] = \ + fuse_to_copied[chunk.chunk] return subtask_chunk_graph def _gen_subtask(self, diff --git a/mars/services/task/supervisor/processor.py b/mars/services/task/supervisor/processor.py index d87cd852eb..f070c93975 100644 --- a/mars/services/task/supervisor/processor.py +++ b/mars/services/task/supervisor/processor.py @@ -254,8 +254,8 @@ async def get_next_stage_processor(self) \ # gen subtask graph available_bands = await self._get_available_band_slots() - subtask_graph = self._preprocessor.analyze( - chunk_graph, available_bands) + subtask_graph = await asyncio.to_thread( + self._preprocessor.analyze, chunk_graph, available_bands) stage_processor = TaskStageProcessor( new_task_id(), self._task, chunk_graph, subtask_graph, list(available_bands), self._get_chunk_optimization_records(), diff --git a/mars/tensor/base/searchsorted.py b/mars/tensor/base/searchsorted.py index 4ae64f7b71..5df7db017e 100644 --- a/mars/tensor/base/searchsorted.py +++ b/mars/tensor/base/searchsorted.py @@ -13,13 +13,16 @@ # limitations under the License. import itertools +from typing import Any, List, Tuple, Type import numpy as np from ... import opcodes as OperandDef +from ...core import TILEABLE_TYPE from ...core.operand import OperandStage -from ...serialization.serializables import KeyField, StringField, \ +from ...serialization.serializables import StringField, \ AnyField, Int64Field, Int32Field +from ...typing import TileableType, ChunkType from ...config import options from ...utils import has_unknown_shape from ..operands import TensorOperand, TensorOperandMixin @@ -31,46 +34,22 @@ class TensorSearchsorted(TensorOperand, TensorOperandMixin): _op_type_ = OperandDef.SEARCHSORTED - _input = KeyField('input') - _values = AnyField('values') - _side = StringField('side') - _combine_size = Int32Field('combine_size') - # offset is used only for map stage - _offset = Int64Field('offset') - - def __init__(self, values=None, side=None, combine_size=None, offset=None, **kw): - super().__init__(_values=values, _side=side, _combine_size=combine_size, - _offset=offset, **kw) + v = AnyField('v') + side = StringField('side') + combine_size = Int32Field('combine_size') + # for chunk + offset = Int64Field('offset') + size = Int64Field('size') + n_chunk = Int64Field('n_chunk') def _set_inputs(self, inputs): super()._set_inputs(inputs) - self._input = self._inputs[0] - if len(self._inputs) == 2: - self._values = self._inputs[1] - - @property - def input(self): - return self._input - - @property - def values(self): - return self._values - - @property - def side(self): - return self._side - - @property - def offset(self): - return self._offset - - @property - def combine_size(self): - return self._combine_size + if isinstance(self.v, TILEABLE_TYPE): + self.v = self._inputs[1] def __call__(self, a, v): inputs = [a] - if isinstance(v, TENSOR_TYPE): + if isinstance(v, TILEABLE_TYPE): inputs.append(v) shape = v.shape else: @@ -100,45 +79,96 @@ def _tile_one_chunk(cls, op, a, v, out): chunks=chunks, nsplits=nsplits) @classmethod - def _combine_chunks(cls, to_combine, op, stage, v, idx): + def _combine_chunks(cls, + to_combine: List[ChunkType], + op_type: Type, + v: Any, + stage: OperandStage, + chunk_index: Tuple[int]): from ..merge import TensorStack + dtype = np.dtype(np.intp) v_shape = v.shape if hasattr(v, 'shape') else () - combine_op = TensorStack(axis=0, dtype=op.outputs[0].dtype) + combine_op = TensorStack(axis=0, dtype=dtype) combine_chunk = combine_op.new_chunk(to_combine, shape=v_shape) - chunk_op = op.copy().reset_key() - chunk_op.stage = stage - in_chunks = [combine_chunk] - if len(op.inputs) == 2: - in_chunks.append(v) - return chunk_op.new_chunk(in_chunks, shape=v_shape, index=idx, - order=op.outputs[0].order) + chunk_op = op_type(dtype=dtype, axis=(0,), stage=stage) + return chunk_op.new_chunk([combine_chunk], shape=v_shape, index=chunk_index, + order=TensorOrder.C_ORDER) @classmethod - def _tile_tree_reduction(cls, op, a, v, out): - if has_unknown_shape(*op.inputs): + def _tile_tree_reduction(cls, + op: "TensorSearchsorted", + a: TileableType, + v: Any, + out: TileableType): + from ..indexing import TensorSlice + from ..merge import TensorConcatenate + from ..reduction import TensorMax, TensorMin + + if has_unknown_shape(a): yield combine_size = op.combine_size or options.combine_size + n_chunk = len(a.chunks) input_len = len(op.inputs) v_chunks = [v] if input_len == 1 else v.chunks + cum_nsplits = [0] + np.cumsum(a.nsplits[0]).tolist() + + input_chunks = [] + offsets = [] + for i in range(n_chunk): + offset = cum_nsplits[i] + cur_chunk = a.chunks[i] + chunk_size = a.shape[0] + chunks = [] + if i > 0: + last_chunk = a.chunks[i - 1] + if last_chunk.shape[0] > 0: + slice_chunk_op = TensorSlice(slices=[slice(-1, None)], + dtype=cur_chunk.dtype) + slice_chunk = slice_chunk_op.new_chunk( + [last_chunk], shape=(1,), order=out.order) + chunks.append(slice_chunk) + chunk_size += 1 + offset -= 1 + chunks.append(cur_chunk) + if i < n_chunk - 1: + next_chunk = a.chunks[i + 1] + if next_chunk.shape[0] > 0: + slice_chunk_op = TensorSlice(slices=[slice(1)], + dtype=cur_chunk.dtype) + slice_chunk = slice_chunk_op.new_chunk( + [next_chunk], shape=(1,), order=out.order) + chunks.append(slice_chunk) + chunk_size += 1 + + concat_op = TensorConcatenate(dtype=cur_chunk.dtype) + concat_chunk = concat_op.new_chunk( + chunks, shape=(chunk_size,), order=out.order, + index=cur_chunk.index) + input_chunks.append(concat_chunk) + offsets.append(offset) out_chunks = [] for v_chunk in v_chunks: - offsets = [0] + np.cumsum(a.nsplits[0]).tolist()[:-1] + chunks = [] v_shape = v_chunk.shape if hasattr(v_chunk, 'shape') else () v_index = v_chunk.index if hasattr(v_chunk, 'index') else (0,) - chunks = [] - for i, c in enumerate(a.chunks): + for inp_chunk, offset in zip(input_chunks, offsets): chunk_op = op.copy().reset_key() chunk_op.stage = OperandStage.map - chunk_op._offset = offsets[i] - in_chunks = [c] - if input_len == 2: - in_chunks.append(v_chunk) - chunks.append(chunk_op.new_chunk(in_chunks, shape=v_shape, - index=c.index, order=out.order)) - + chunk_op.offset = offset + chunk_op.n_chunk = n_chunk + chunk_op.size = a.shape[0] + chunk_inputs = [inp_chunk] + if input_len > 1: + chunk_inputs.append(v_chunk) + map_chunk = chunk_op.new_chunk( + chunk_inputs, shape=v_shape, index=inp_chunk.index, + order=out.order) + chunks.append(map_chunk) + + op_type = TensorMax if op.side == 'right' else TensorMin while len(chunks) > combine_size: new_chunks = [] it = itertools.count(0) @@ -149,10 +179,12 @@ def _tile_tree_reduction(cls, op, a, v, out): break new_chunks.append( - cls._combine_chunks(to_combine, op, OperandStage.combine, v_chunk, (j,))) + cls._combine_chunks(to_combine, op_type, v_chunk, + OperandStage.combine, (j,))) chunks = new_chunks - chunk = cls._combine_chunks(chunks, op, OperandStage.reduce, v_chunk, v_index) + chunk = cls._combine_chunks(chunks, op_type, v_chunk, + OperandStage.agg, v_index) out_chunks.append(chunk) new_op = op.copy().reset_key() @@ -166,7 +198,7 @@ def tile(cls, op): out = op.outputs[0] input_len = len(op.inputs) if input_len == 1: - v = op.values + v = op.v else: v = op.inputs[1] @@ -179,64 +211,51 @@ def _execute_without_stage(cls, xp, a, v, op): return xp.searchsorted(a, v, side=op.side) @classmethod - def _execute_map(cls, xp, a, v, op): - # in the map phase, calculate the indices and positions - # for instance, a=[1, 4, 6], v=5, return will be (2, 6) - indices = xp.atleast_1d(xp.searchsorted(a, v, side=op.side)) - data_indices = indices.copy() - # if the value is larger than all data - # for instance, a=[1, 4, 6], v=7 - # return will be (2, 6), not (3, 6), thus needs to subtract 1 - data_indices = xp.subtract(data_indices, 1, out=data_indices, - where=data_indices >= len(a)) - data = a[data_indices] - if op.offset > 0: - indices = xp.add(indices, op.offset, out=indices) - if np.isscalar(v): - indices, data = indices[0], data[0] - - return indices, data - - @classmethod - def _execute_combine(cls, xp, a, v, op): - inp_indices, inp_data = a - if np.isscalar(v): - ind = xp.searchsorted(inp_data, v, side=op.side) - if ind >= len(inp_data): - ind -= 1 - return inp_indices[ind], inp_data[ind] + def _execute_map(cls, + xp: Any, + a: np.ndarray, + v: Any, + op: "TensorSearchsorted"): + out = op.outputs[0] + i = out.index[0] + side = op.side + + raw_v = v + v = xp.atleast_1d(v) + searched = xp.searchsorted(a, v, side=op.side) + xp.add(searched, op.offset, out=searched) + a_min, a_max = a[0], a[-1] + if i == 0: + # the first chunk + if a_min == a_max: + miss = v > a_max + else: + miss = v > a_max if side == 'left' else v >= a_max + elif i == op.n_chunk - 1: + # the last chunk + if a_min == a_max: + miss = v < a_min + else: + miss = v <= a_min if side == 'left' else v < a_min else: - ret_indices = np.empty(v.shape, dtype=np.intp) - ret_data = np.empty(v.shape, dtype=inp_data.dtype) - for idx in itertools.product(*(range(s) for s in v.shape)): - ind = xp.searchsorted(inp_data[(slice(None),) + idx], v[idx], side=op.side) - if ind >= len(inp_indices): - ind -= 1 - ret_indices[idx] = inp_indices[(ind,) + idx] - ret_data[idx] = inp_data[(ind,) + idx] - return ret_indices, ret_data - - @classmethod - def _execute_reduce(cls, xp, a, v, op): - inp_indices, inp_data = a - if np.isscalar(v): - ind = xp.searchsorted(inp_data, v, side=op.side) - if ind >= len(inp_indices): - ind -= 1 - return inp_indices[ind] + if side == 'left' and a_min < a_max: + miss = (v <= a_min) | (v > a_max) + elif a_min < a_max: + miss = (v < a_min) | (v >= a_max) + else: + assert a_min == a_max + miss = v != a_min + if side == 'right': + searched[miss] = -1 else: - indices = np.empty(v.shape, dtype=np.intp) - for idx in itertools.product(*(range(s) for s in v.shape)): - ind = xp.searchsorted(inp_data[(slice(None),) + idx], v[idx], side=op.side) - if ind >= len(inp_indices): - ind -= 1 - indices[idx] = inp_indices[(ind,) + idx] - return indices + searched[miss] = op.size + 1 + + return searched[0] if np.isscalar(raw_v) else searched @classmethod def execute(cls, ctx, op): a = ctx[op.inputs[0].key] - v = ctx[op.inputs[1].key] if len(op.inputs) == 2 else op.values + v = ctx[op.inputs[1].key] if len(op.inputs) == 2 else op.v data = [] if isinstance(a, tuple): @@ -259,12 +278,9 @@ def execute(cls, ctx, op): with device(device_id): if op.stage is None: ret = cls._execute_without_stage(xp, a, v, op) - elif op.stage == OperandStage.map: - ret = cls._execute_map(xp, a, v, op) - elif op.stage == OperandStage.combine: - ret = cls._execute_combine(xp, a, v, op) else: - ret = cls._execute_reduce(xp, a, v, op) + assert op.stage == OperandStage.map + ret = cls._execute_map(xp, a, v, op) ctx[op.outputs[0].key] = ret @@ -353,6 +369,6 @@ def searchsorted(a, v, side='left', sorter=None, combine_size=None): if not np.isscalar(v): v = astensor(v) - op = TensorSearchsorted(values=v, side=side, dtype=np.dtype(np.intp), + op = TensorSearchsorted(v=v, side=side, dtype=np.dtype(np.intp), combine_size=combine_size) return op(a, v) diff --git a/mars/tensor/base/tests/test_base.py b/mars/tensor/base/tests/test_base.py index cd0516328c..3ea329b7d9 100644 --- a/mars/tensor/base/tests/test_base.py +++ b/mars/tensor/base/tests/test_base.py @@ -461,7 +461,7 @@ def test_searchsorted(): assert t1.nsplits == () assert len(t1.chunks) == 1 - assert t1.chunks[0].op.stage == OperandStage.reduce + assert t1.chunks[0].op.stage == OperandStage.agg with pytest.raises(ValueError): searchsorted(np.random.randint(10, size=(14, 14)), 1) diff --git a/mars/tensor/base/tests/test_base_execute.py b/mars/tensor/base/tests/test_base_execute.py index 8e52c92f71..a5ed26321d 100644 --- a/mars/tensor/base/tests/test_base_execute.py +++ b/mars/tensor/base/tests/test_base_execute.py @@ -919,6 +919,18 @@ def test_searchsorted_execution(setup): expected = np.searchsorted(raw3, 20, sorter=order) np.testing.assert_array_equal(res, expected) + # all data same + raw4 = np.ones(8) + arr = tensor(raw4, chunk_size=2) + + for val in (0, 1, 2): + for side in ('left', 'right'): + t15 = searchsorted(arr, val, side=side) + + res = t15.execute().fetch() + expected = np.searchsorted(raw4, val, side=side) + np.testing.assert_array_equal(res, expected) + def test_unique_execution(setup): rs = np.random.RandomState(0) diff --git a/mars/tensor/stats/__init__.py b/mars/tensor/stats/__init__.py index 449c67a06f..4a4a826b39 100644 --- a/mars/tensor/stats/__init__.py +++ b/mars/tensor/stats/__init__.py @@ -16,3 +16,4 @@ from .chisquare import chisquare from .power_divergence import power_divergence from .ttest import ttest_1samp, ttest_ind, ttest_ind_from_stats, ttest_rel +from .ks_2samp import ks_2samp diff --git a/mars/tensor/stats/core.py b/mars/tensor/stats/core.py new file mode 100644 index 0000000000..f743b78f29 --- /dev/null +++ b/mars/tensor/stats/core.py @@ -0,0 +1,18 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import namedtuple + + +KstestResult = namedtuple('KstestResult', ('statistic', 'pvalue')) diff --git a/mars/tensor/stats/ks_2samp.py b/mars/tensor/stats/ks_2samp.py new file mode 100644 index 0000000000..ab6daec8cb --- /dev/null +++ b/mars/tensor/stats/ks_2samp.py @@ -0,0 +1,483 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import math +import warnings +from math import gcd +from typing import Union + +import numpy as np +from scipy import special +from scipy.stats import distributions + +from ... import tensor as mt +from ...core import ExecutableTuple +from ...typing import TileableType +from .core import KstestResult + + +Ks_2sampResult = KstestResult + + +def _compute_prob_inside_method(m, n, g, h): # pragma: no cover + """ + Count the proportion of paths that stay strictly inside two diagonal lines. + + Parameters + ---------- + m : integer + m > 0 + n : integer + n > 0 + g : integer + g is greatest common divisor of m and n + h : integer + 0 <= h <= lcm(m,n) + + Returns + ------- + p : float + The proportion of paths that stay inside the two lines. + + + Count the integer lattice paths from (0, 0) to (m, n) which satisfy + |x/m - y/n| < h / lcm(m, n). + The paths make steps of size +1 in either positive x or positive y directions. + + We generally follow Hodges' treatment of Drion/Gnedenko/Korolyuk. + Hodges, J.L. Jr., + "The Significance Probability of the Smirnov Two-Sample Test," + Arkiv fiur Matematik, 3, No. 43 (1958), 469-86. + + """ + # Probability is symmetrical in m, n. Computation below uses m >= n. + if m < n: + m, n = n, m + mg = m // g + ng = n // g + + # Count the integer lattice paths from (0, 0) to (m, n) which satisfy + # |nx/g - my/g| < h. + # Compute matrix A such that: + # A(x, 0) = A(0, y) = 1 + # A(x, y) = A(x, y-1) + A(x-1, y), for x,y>=1, except that + # A(x, y) = 0 if |x/m - y/n|>= h + # Probability is A(m, n)/binom(m+n, n) + # Optimizations exist for m==n, m==n*p. + # Only need to preserve a single column of A, and only a sliding window of it. + # minj keeps track of the slide. + minj, maxj = 0, min(int(np.ceil(h / mg)), n + 1) + curlen = maxj - minj + # Make a vector long enough to hold maximum window needed. + lenA = min(2 * maxj + 2, n + 1) + # This is an integer calculation, but the entries are essentially + # binomial coefficients, hence grow quickly. + # Scaling after each column is computed avoids dividing by a + # large binomial coefficient at the end, but is not sufficient to avoid + # the large dynamic range which appears during the calculation. + # Instead we rescale based on the magnitude of the right most term in + # the column and keep track of an exponent separately and apply + # it at the end of the calculation. Similarly when multiplying by + # the binomial coefficient + dtype = np.float64 + A = np.zeros(lenA, dtype=dtype) + # Initialize the first column + A[minj:maxj] = 1 + expnt = 0 + for i in range(1, m + 1): + # Generate the next column. + # First calculate the sliding window + lastminj, lastlen = minj, curlen + minj = max(int(np.floor((ng * i - h) / mg)) + 1, 0) + minj = min(minj, n) + maxj = min(int(np.ceil((ng * i + h) / mg)), n + 1) + if maxj <= minj: + return 0 + # Now fill in the values + A[0:maxj - minj] = np.cumsum(A[minj - lastminj:maxj - lastminj]) + curlen = maxj - minj + if lastlen > curlen: + # Set some carried-over elements to 0 + A[maxj - minj:maxj - minj + (lastlen - curlen)] = 0 + # Rescale if the right most value is over 2**900 + val = A[maxj - minj - 1] + _, valexpt = math.frexp(val) + if valexpt > 900: + # Scaling to bring down to about 2**800 appears + # sufficient for sizes under 10000. + valexpt -= 800 + A = np.ldexp(A, -valexpt) + expnt += valexpt + + val = A[maxj - minj - 1] + # Now divide by the binomial (m+n)!/m!/n! + for i in range(1, n + 1): + val = (val * i) / (m + i) + _, valexpt = math.frexp(val) + if valexpt < -128: + val = np.ldexp(val, -valexpt) + expnt += valexpt + # Finally scale if needed. + return np.ldexp(val, expnt) + + +def _compute_prob_outside_square(n, h): # pragma: no cover + """ + Compute the proportion of paths that pass outside the two diagonal lines. + + Parameters + ---------- + n : integer + n > 0 + h : integer + 0 <= h <= n + + Returns + ------- + p : float + The proportion of paths that pass outside the lines x-y = +/-h. + + """ + # Compute Pr(D_{n,n} >= h/n) + # Prob = 2 * ( binom(2n, n-h) - binom(2n, n-2a) + binom(2n, n-3a) - ... ) / binom(2n, n) + # This formulation exhibits subtractive cancellation. + # Instead divide each term by binom(2n, n), then factor common terms + # and use a Horner-like algorithm + # P = 2 * A0 * (1 - A1*(1 - A2*(1 - A3*(1 - A4*(...))))) + + P = 0.0 + k = int(np.floor(n / h)) + while k >= 0: + p1 = 1.0 + # Each of the Ai terms has numerator and denominator with h simple terms. + for j in range(h): + p1 = (n - k * h - j) * p1 / (n + k * h + j + 1) + P = p1 * (1.0 - P) + k -= 1 + return 2 * P + + +def _count_paths_outside_method(m, n, g, h): # pragma: no cover + """ + Count the number of paths that pass outside the specified diagonal. + + Parameters + ---------- + m : integer + m > 0 + n : integer + n > 0 + g : integer + g is greatest common divisor of m and n + h : integer + 0 <= h <= lcm(m,n) + + Returns + ------- + p : float + The number of paths that go low. + The calculation may overflow - check for a finite answer. + + Raises + ------ + FloatingPointError: Raised if the intermediate computation goes outside + the range of a float. + + Notes + ----- + Count the integer lattice paths from (0, 0) to (m, n), which at some + point (x, y) along the path, satisfy: + m*y <= n*x - h*g + The paths make steps of size +1 in either positive x or positive y directions. + + We generally follow Hodges' treatment of Drion/Gnedenko/Korolyuk. + Hodges, J.L. Jr., + "The Significance Probability of the Smirnov Two-Sample Test," + Arkiv fiur Matematik, 3, No. 43 (1958), 469-86. + + """ + # Compute #paths which stay lower than x/m-y/n = h/lcm(m,n) + # B(x, y) = #{paths from (0,0) to (x,y) without previously crossing the boundary} + # = binom(x, y) - #{paths which already reached the boundary} + # Multiply by the number of path extensions going from (x, y) to (m, n) + # Sum. + + # Probability is symmetrical in m, n. Computation below assumes m >= n. + if m < n: + m, n = n, m + mg = m // g + ng = n // g + + # Not every x needs to be considered. + # xj holds the list of x values to be checked. + # Wherever n*x/m + ng*h crosses an integer + lxj = n + (mg-h)//mg + xj = [(h + mg * j + ng-1)//ng for j in range(lxj)] + # B is an array just holding a few values of B(x,y), the ones needed. + # B[j] == B(x_j, j) + if lxj == 0: + return np.round(special.binom(m + n, n)) + B = np.zeros(lxj) + B[0] = 1 + # Compute the B(x, y) terms + # The binomial coefficient is an integer, but special.binom() may return a float. + # Round it to the nearest integer. + for j in range(1, lxj): + Bj = np.round(special.binom(xj[j] + j, j)) + if not np.isfinite(Bj): + raise FloatingPointError() + for i in range(j): + bin = np.round(special.binom(xj[j] - xj[i] + j - i, j-i)) # pylint: disable=redefined-builtin + Bj -= bin * B[i] + B[j] = Bj + if not np.isfinite(Bj): + raise FloatingPointError() + # Compute the number of path extensions... + num_paths = 0 + for j in range(lxj): + bin = np.round(special.binom((m-xj[j]) + (n - j), n-j)) + term = B[j] * bin + if not np.isfinite(term): + raise FloatingPointError() + num_paths += term + return np.round(num_paths) + + +def _attempt_exact_2kssamp(n1, n2, g, d, alternative): # pragma: no cover + """Attempts to compute the exact 2sample probability. + + n1, n2 are the sample sizes + g is the gcd(n1, n2) + d is the computed max difference in ECDFs + + Returns (success, d, probability) + """ + lcm = (n1 // g) * n2 + h = int(np.round(d * lcm)) + d = h * 1.0 / lcm + if h == 0: + return True, d, 1.0 + saw_fp_error, prob = False, np.nan + try: + if alternative == 'two-sided': + if n1 == n2: + prob = _compute_prob_outside_square(n1, h) + else: + prob = 1 - _compute_prob_inside_method(n1, n2, g, h) + else: + if n1 == n2: + # prob = binom(2n, n-h) / binom(2n, n) + # Evaluating in that form incurs roundoff errors + # from special.binom. Instead calculate directly + jrange = np.arange(h) + prob = np.prod((n1 - jrange) / (n1 + jrange + 1.0)) + else: + num_paths = _count_paths_outside_method(n1, n2, g, h) + bin = special.binom(n1 + n2, n1) # pylint: disable=redefined-builtin + if not np.isfinite(bin) or not np.isfinite(num_paths) or num_paths > bin: + saw_fp_error = True + else: + prob = num_paths / bin + + except FloatingPointError: + saw_fp_error = True + + if saw_fp_error: + return False, d, np.nan + if not (0 <= prob <= 1): + return False, d, prob + return True, d, prob + + +def _calc_prob(d, n1, n2, alternative, mode): # pragma: no cover + MAX_AUTO_N = 10000 # 'auto' will attempt to be exact if n1,n2 <= MAX_AUTO_N + + g = gcd(n1, n2) + n1g = n1 // g + n2g = n2 // g + prob = -mt.inf + original_mode = mode + if mode == 'auto': + mode = 'exact' if max(n1, n2) <= MAX_AUTO_N else 'asymp' + elif mode == 'exact': + # If lcm(n1, n2) is too big, switch from exact to asymp + if n1g >= np.iinfo(np.int_).max / n2g: + mode = 'asymp' + warnings.warn( + f"Exact ks_2samp calculation not possible with samples sizes " + f"{n1} and {n2}. Switching to 'asymp'.", RuntimeWarning) + + if mode == 'exact': + success, d, prob = _attempt_exact_2kssamp(n1, n2, g, d, alternative) + if not success: + mode = 'asymp' + if original_mode == 'exact': + warnings.warn(f"ks_2samp: Exact calculation unsuccessful. " + f"Switching to mode={mode}.", RuntimeWarning) + + if mode == 'asymp': + # The product n1*n2 is large. Use Smirnov's asymptoptic formula. + # Ensure float to avoid overflow in multiplication + # sorted because the one-sided formula is not symmetric in n1, n2 + m, n = sorted([float(n1), float(n2)], reverse=True) + en = m * n / (m + n) + if alternative == 'two-sided': + prob = distributions.kstwo.sf(d, np.round(en)) + else: + z = np.sqrt(en) * d + # Use Hodges' suggested approximation Eqn 5.3 + # Requires m to be the larger of (n1, n2) + expt = -2 * z**2 - 2 * z * (m + 2*n)/np.sqrt(m*n*(m+n))/3.0 + prob = np.exp(expt) + + return np.clip(prob, 0, 1) + + +def ks_2samp(data1: Union[np.ndarray, list, TileableType], + data2: Union[np.ndarray, list, TileableType], + alternative: str = 'two-sided', + mode: str = 'auto'): + """ + Compute the Kolmogorov-Smirnov statistic on 2 samples. + + This is a two-sided test for the null hypothesis that 2 independent samples + are drawn from the same continuous distribution. The alternative hypothesis + can be either 'two-sided' (default), 'less' or 'greater'. + + Parameters + ---------- + data1, data2 : array_like, 1-Dimensional + Two arrays of sample observations assumed to be drawn from a continuous + distribution, sample sizes can be different. + alternative : {'two-sided', 'less', 'greater'}, optional + Defines the alternative hypothesis. + The following options are available (default is 'two-sided'): + + * 'two-sided' + * 'less': one-sided, see explanation in Notes + * 'greater': one-sided, see explanation in Notes + mode : {'auto', 'exact', 'asymp'}, optional + Defines the method used for calculating the p-value. + The following options are available (default is 'auto'): + + * 'auto' : use 'exact' for small size arrays, 'asymp' for large + * 'exact' : use exact distribution of test statistic + * 'asymp' : use asymptotic distribution of test statistic + + Returns + ------- + statistic : float + KS statistic. + pvalue : float + Two-tailed p-value. + + See Also + -------- + kstest, ks_1samp, epps_singleton_2samp, anderson_ksamp + + Notes + ----- + This tests whether 2 samples are drawn from the same distribution. Note + that, like in the case of the one-sample KS test, the distribution is + assumed to be continuous. + + In the one-sided test, the alternative is that the empirical + cumulative distribution function F(x) of the data1 variable is "less" + or "greater" than the empirical cumulative distribution function G(x) + of the data2 variable, ``F(x)<=G(x)``, resp. ``F(x)>=G(x)``. + + If the KS statistic is small or the p-value is high, then we cannot + reject the hypothesis that the distributions of the two samples + are the same. + + If the mode is 'auto', the computation is exact if the sample sizes are + less than 10000. For larger sizes, the computation uses the + Kolmogorov-Smirnov distributions to compute an approximate value. + + The 'two-sided' 'exact' computation computes the complementary probability + and then subtracts from 1. As such, the minimum probability it can return + is about 1e-16. While the algorithm itself is exact, numerical + errors may accumulate for large sample sizes. It is most suited to + situations in which one of the sample sizes is only a few thousand. + + We generally follow Hodges' treatment of Drion/Gnedenko/Korolyuk [1]_. + + References + ---------- + .. [1] Hodges, J.L. Jr., "The Significance Probability of the Smirnov + Two-Sample Test," Arkiv fiur Matematik, 3, No. 43 (1958), 469-86. + + + Examples + -------- + >>> import numpy as np + >>> from scipy import stats + >>> import mars.tensor as mt + >>> from mars.tensor.stats import ks_2samp + >>> np.random.seed(12345678) #fix random seed to get the same result + >>> n1 = 200 # size of first sample + >>> n2 = 300 # size of second sample + + For a different distribution, we can reject the null hypothesis since the + pvalue is below 1%: + + >>> rvs1 = stats.norm.rvs(size=n1, loc=0., scale=1) + >>> rvs2 = stats.norm.rvs(size=n2, loc=0.5, scale=1.5) + >>> ks_2samp(rvs1, rvs2).execute() + KstestResult(statistic=0.20833333333333337, pvalue=5.1292795978041816e-05) + + For a slightly different distribution, we cannot reject the null hypothesis + at a 10% or lower alpha since the p-value at 0.144 is higher than 10% + + >>> rvs3 = stats.norm.rvs(size=n2, loc=0.01, scale=1.0) + >>> ks_2samp(rvs1, rvs3).execute() + KstestResult(statistic=0.10333333333333333, pvalue=0.14691437867433788) + + For an identical distribution, we cannot reject the null hypothesis since + the p-value is high, 41%: + + >>> rvs4 = stats.norm.rvs(size=n2, loc=0.0, scale=1.0) + >>> ks_2samp(rvs1, rvs4).execute() + KstestResult(statistic=0.07999999999999996, pvalue=0.4115432028915931) + + """ + + if mode not in ['auto', 'exact', 'asymp']: + raise ValueError(f'Invalid value for mode: {mode}') + alternative = {'t': 'two-sided', 'g': 'greater', 'l': 'less'}.get( + alternative.lower()[0], alternative) + if alternative not in ['two-sided', 'less', 'greater']: + raise ValueError(f'Invalid value for alternative: {alternative}') + data1 = mt.asarray(data1) + data2 = mt.asarray(data2) + data1 = mt.sort(data1) + data2 = mt.sort(data2) + n1 = data1.shape[0] + n2 = data2.shape[0] + if min(n1, n2) == 0: + raise ValueError('Data passed to ks_2samp must not be empty') + + data_all = mt.concatenate([data1, data2]) + # using searchsorted solves equal data problem + cdf1 = mt.searchsorted(data1, data_all, side='right') / n1 + cdf2 = mt.searchsorted(data2, data_all, side='right') / n2 + cddiffs = cdf1 - cdf2 + minS = mt.clip(-mt.min(cddiffs), 0, 1) # Ensure sign of minS is not negative. + maxS = mt.max(cddiffs) + alt2Dvalue = {'less': minS, 'greater': maxS, 'two-sided': mt.maximum(minS, maxS)} + d = alt2Dvalue[alternative] + prob = d.map_chunk(_calc_prob, args=(n1, n2, alternative, mode), + elementwise=True, dtype=d.dtype) + + return ExecutableTuple(Ks_2sampResult(d, prob)) diff --git a/mars/tensor/stats/tests/test_stats_execute.py b/mars/tensor/stats/tests/test_stats_execute.py index 3d313fdbac..7ca009a01e 100644 --- a/mars/tensor/stats/tests/test_stats_execute.py +++ b/mars/tensor/stats/tests/test_stats_execute.py @@ -22,10 +22,12 @@ entropy as sp_entropy, power_divergence as sp_power_divergence, chisquare as sp_chisquare, + ks_2samp as sp_ks_2samp, ttest_rel as sp_ttest_rel, ttest_ind as sp_ttest_ind, ttest_ind_from_stats as sp_ttest_ind_from_stats, ttest_1samp as sp_ttest_1samp, + norm as sp_norm ) from mars.lib.version import parse as parse_version @@ -33,6 +35,7 @@ from mars.tensor.stats import ( entropy, power_divergence, chisquare, ttest_ind, ttest_rel, ttest_1samp, ttest_ind_from_stats, + ks_2samp ) @@ -192,3 +195,28 @@ def test_t_test_execution(setup): expected = sp_func(fa_raw, fb_raw) np.testing.assert_almost_equal(expected[0], result[0]) np.testing.assert_almost_equal(expected[1], result[1]) + + +@pytest.mark.parametrize('chunk_size', [5, 15]) +def test_ks_2samp(setup, chunk_size): + n1 = 10 + n2 = 15 + rs = np.random.RandomState(0) + rvs1 = sp_norm.rvs(size=n1, loc=0., scale=1, random_state=rs) + rvs2 = sp_norm.rvs(size=n2, loc=0.5, scale=1.5, random_state=rs) + + d1 = tensor(rvs1, chunk_size=chunk_size) + d2 = tensor(rvs2, chunk_size=chunk_size) + + result = ks_2samp(d1, d2).execute().fetch() + expected = sp_ks_2samp(rvs1, rvs2) + assert result == expected + + with pytest.raises(ValueError): + ks_2samp(d1, d2, alternative='unknown') + + with pytest.raises(ValueError): + ks_2samp(d1, d2, mode='unknown') + + with pytest.raises(ValueError): + ks_2samp(d1, []) diff --git a/setup.cfg b/setup.cfg index 8a5b18526b..f5f6efc363 100644 --- a/setup.cfg +++ b/setup.cfg @@ -153,4 +153,4 @@ exclude = [codespell] ignore-words-list = hist,rcall,fpr,ser,nd,inout,ot,Ba,ba,asend,hart,coo,splitted,datas,fro -skip = ./build,./docs/build,./mars/lib,node_modules,static,generated,*.po,*.ts,*.json,*.c,*.cfg +skip = .idea,.git,./build,./docs/build,./mars/lib,node_modules,static,generated,*.po,*.ts,*.json,*.c,*.cfg