Skip to content

Commit

Permalink
Implements mars.tensor.stats.ks_2samp (#2324)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuye (Chris) Qin authored Aug 11, 2021
1 parent 3e308f2 commit e56ce37
Show file tree
Hide file tree
Showing 19 changed files with 767 additions and 144 deletions.
1 change: 1 addition & 0 deletions docs/source/reference/tensor/statistics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Statistical tests
:nosignatures:

mars.tensor.stats.chisquare
mars.tensor.stats.ks_2samp
mars.tensor.stats.power_divergence
mars.tensor.stats.ttest_1samp
mars.tensor.stats.ttest_ind
Expand Down
14 changes: 6 additions & 8 deletions mars/_utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

import importlib
import os
import pkgutil
import pickle
import pkgutil
import types
import uuid
from collections import deque
Expand All @@ -27,23 +27,21 @@ import numpy as np
import pandas as pd
import cloudpickle
cimport cython

from .lib.mmh3 import hash as mmh_hash, hash_bytes as mmh_hash_bytes, \
hash_from_buffer as mmh3_hash_from_buffer

try:
from pandas.tseries.offsets import Tick as PDTick
except ImportError:
PDTick = None

try:
from sqlalchemy.sql import Selectable as SASelectable
from sqlalchemy.sql.sqltypes import TypeEngine as SATypeEngine
except ImportError:
SASelectable, SATypeEngine = None, None

cdef bint _has_cupy = pkgutil.find_loader('cupy')
cdef bint _has_cudf = pkgutil.find_loader('cudf')
from .lib.mmh3 import hash as mmh_hash, hash_bytes as mmh_hash_bytes, \
hash_from_buffer as mmh3_hash_from_buffer

cdef bint _has_cupy = bool(pkgutil.find_loader('cupy'))
cdef bint _has_cudf = bool(pkgutil.find_loader('cudf'))


cpdef str to_str(s, encoding='utf-8'):
Expand Down
1 change: 1 addition & 0 deletions mars/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"@inherits": "@mars/deploy/oscar/base_config.yml"
13 changes: 13 additions & 0 deletions mars/core/entity/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
46 changes: 46 additions & 0 deletions mars/core/entity/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 1999-2021 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from mars import tensor as mt
from mars.core import recursive_tile
from mars.tensor.operands import TensorOperand, TensorOperandMixin
from mars.utils import has_unknown_shape


class _TestOperand(TensorOperand, TensorOperandMixin):

@classmethod
def tile(cls, op: "_TestOperand"):
data1, data2 = op.inputs

data1 = mt.sort(data1)
data2 = mt.sort(data2)
data_all = mt.concatenate([data1, data2])
s1 = mt.searchsorted(data1, data_all)
s2 = mt.searchsorted(data2, data_all)
result = yield from recursive_tile(mt.concatenate([s1, s2]))
# data1 will be yield by s1
assert not has_unknown_shape(data1)
assert not has_unknown_shape(data2)
assert not has_unknown_shape(data_all)
return result


def test_recursive_tile(setup):
d1 = mt.random.rand(10, chunk_size=5)
d2 = mt.random.rand(10, chunk_size=5)
op = _TestOperand()
t = op.new_tensor([d1, d2], dtype=d1.dtype,
shape=(20,), order=d1.order)
t.execute()
14 changes: 13 additions & 1 deletion mars/core/entity/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def recursive_tile(tileable: TileableType, *tileables: TileableType) -> \
tileable = raw[0]
tileables = raw[1:]

inputs_set = set(tileable.op.inputs)
to_tile = [tileable] + list(tileables)
q = [t for t in to_tile if t.is_coarse()]
while q:
Expand All @@ -72,7 +73,18 @@ def recursive_tile(tileable: TileableType, *tileables: TileableType) -> \
if cs:
q.extend(cs)
continue
yield from handler.tile(t.op.outputs)
for obj in handler.tile(t.op.outputs):
to_update_inputs = []
chunks = []
for inp in t.op.inputs:
if has_unknown_shape(inp):
to_update_inputs.append(inp)
if inp not in inputs_set:
chunks.extend(inp.chunks)
if obj is None:
yield chunks + to_update_inputs
else:
yield obj + to_update_inputs
q.pop()

if not return_list:
Expand Down
8 changes: 3 additions & 5 deletions mars/core/graph/builder/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,15 @@ def _tile(self,
visited: Set[EntityType]):
try:
need_process = next(tile_handler)
if need_process is None:
chunks = []
else:
chunks = []
chunks = []
if need_process is not None:
for t in need_process:
if isinstance(t, CHUNK_TYPE):
chunks.append(self._get_data(t))
elif isinstance(t, TILEABLE_TYPE):
to_update_tileables.append(self._get_data(t))
# not finished yet
self._add_nodes(chunk_graph, chunks, visited)
self._add_nodes(chunk_graph, chunks.copy(), visited)
next_tileable_handlers.append((tileable, tile_handler))
# add intermediate chunks into result chunks
# to prevent them being pruned
Expand Down
4 changes: 3 additions & 1 deletion mars/deploy/oscar/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1129,9 +1129,11 @@ def __exit__(self, *_):
self.progress_bar.__exit__(*_)

def update(self, progress: float):
progress = min(progress, 100)
last_progress = self.last_progress
if self.progress_bar:
self.progress_bar.update(progress - last_progress)
incr = max(progress - last_progress, 0)
self.progress_bar.update(incr)
self.last_progress = max(last_progress, progress)


Expand Down
7 changes: 0 additions & 7 deletions mars/opcodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,6 @@

FUSE = 801

# control
ENTER = 901
LEAVE = 902
FIX_LATEST = 903
IF_ELSE = 904
NEXT_ITER = 905

# table like input for tensor
TABLE_COO = 1003
# store tensor as coo format
Expand Down
3 changes: 2 additions & 1 deletion mars/services/task/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ def _build_fuse_subtask_chunk_graph(self,
# the last chunk
result_chunks.append(copied_fuse_chunk)
fuse_to_copied[fuse_chunk] = copied_fuse_chunk
self._chunk_to_copied[chunk] = fuse_to_copied[chunk.chunk]
self._chunk_to_copied[chunk.chunk] = self._chunk_to_copied[chunk] = \
fuse_to_copied[chunk.chunk]
return subtask_chunk_graph

def _gen_subtask(self,
Expand Down
4 changes: 2 additions & 2 deletions mars/services/task/supervisor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ async def get_next_stage_processor(self) \

# gen subtask graph
available_bands = await self._get_available_band_slots()
subtask_graph = self._preprocessor.analyze(
chunk_graph, available_bands)
subtask_graph = await asyncio.to_thread(
self._preprocessor.analyze, chunk_graph, available_bands)
stage_processor = TaskStageProcessor(
new_task_id(), self._task, chunk_graph, subtask_graph,
list(available_bands), self._get_chunk_optimization_records(),
Expand Down
Loading

0 comments on commit e56ce37

Please sign in to comment.