diff --git a/mars/dataframe/groupby/aggregation.py b/mars/dataframe/groupby/aggregation.py index 8e18656fa2..5533f52e33 100644 --- a/mars/dataframe/groupby/aggregation.py +++ b/mars/dataframe/groupby/aggregation.py @@ -55,6 +55,11 @@ from ..reduction.aggregation import is_funcs_aggregate, normalize_reduction_funcs from ..utils import parse_index, build_concatenated_rows_frame, is_cudf from .core import DataFrameGroupByOperand +from .sort import ( + DataFramePSRSGroupbySample, + DataFrameGroupbyConcatPivot, + DataFrameGroupbySortShuffle, +) cp = lazy_import("cupy", rename="cp") cudf = lazy_import("cudf") @@ -293,6 +298,117 @@ def __call__(self, groupby): else: return self._call_series(groupby, df) + @classmethod + def partition_merge_data( + cls, + op: "DataFrameGroupByAgg", + partition_chunks: List[ChunkType], + proxy_chunk: ChunkType, + ): + # stage 4: all *ith* classes are gathered and merged + partition_sort_chunks = [] + properties = dict(by=op.groupby_params["by"], gpu=op.is_gpu()) + out_df = op.outputs[0] + + for i, partition_chunk in enumerate(partition_chunks): + output_types = ( + [OutputType.dataframe_groupby] + if out_df.ndim == 2 + else [OutputType.series_groupby] + ) + partition_shuffle_reduce = DataFrameGroupbySortShuffle( + stage=OperandStage.reduce, + reducer_index=(i, 0), + output_types=output_types, + **properties, + ) + chunk_shape = list(partition_chunk.shape) + chunk_shape[0] = np.nan + + kw = dict( + shape=tuple(chunk_shape), + index=partition_chunk.index, + index_value=partition_chunk.index_value, + ) + if op.outputs[0].ndim == 2: + kw.update( + dict( + columns_value=partition_chunk.columns_value, + dtypes=partition_chunk.dtypes, + ) + ) + else: + kw.update(dict(dtype=partition_chunk.dtype, name=partition_chunk.name)) + cs = partition_shuffle_reduce.new_chunks([proxy_chunk], **kw) + partition_sort_chunks.append(cs[0]) + return partition_sort_chunks + + @classmethod + def partition_local_data( + cls, + op: "DataFrameGroupByAgg", + sorted_chunks: List[ChunkType], + concat_pivot_chunk: ChunkType, + in_df: TileableType, + ): + # properties = dict(by=op.groupby_params["by"], gpu=op.is_gpu()) + out_df = op.outputs[0] + map_chunks = [] + chunk_shape = (in_df.chunk_shape[0], 1) + for chunk in sorted_chunks: + chunk_inputs = [chunk, concat_pivot_chunk] + output_types = ( + [OutputType.dataframe_groupby] + if out_df.ndim == 2 + else [OutputType.series_groupby] + ) + map_chunk_op = DataFrameGroupbySortShuffle( + shuffle_size=chunk_shape[0], + stage=OperandStage.map, + n_partition=len(sorted_chunks), + output_types=output_types, + ) + kw = dict() + if out_df.ndim == 2: + kw.update( + dict( + columns_value=chunk_inputs[0].columns_value, + dtypes=chunk_inputs[0].dtypes, + ) + ) + else: + kw.update(dict(dtype=chunk_inputs[0].dtype, name=chunk_inputs[0].name)) + + map_chunks.append( + map_chunk_op.new_chunk( + chunk_inputs, + shape=chunk_shape, + index=chunk.index, + index_value=chunk_inputs[0].index_value, + # **kw + ) + ) + + return map_chunks + + @classmethod + def _gen_shuffle_chunks_with_pivot( + cls, + op: "DataFrameGroupByAgg", + in_df: TileableType, + chunks: List[ChunkType], + pivot: ChunkType, + ): + map_chunks = cls.partition_local_data(op, chunks, pivot, in_df) + + proxy_chunk = DataFrameShuffleProxy( + output_types=[OutputType.dataframe] + ).new_chunk(map_chunks, shape=()) + + partition_sort_chunks = cls.partition_merge_data(op, map_chunks, proxy_chunk) + + return partition_sort_chunks + @classmethod def _gen_shuffle_chunks(cls, op, in_df, chunks): # generate map chunks @@ -333,7 +449,6 @@ def _gen_shuffle_chunks(cls, op, in_df, chunks): index_value=None, ) ) - return reduce_chunks @classmethod @@ -349,7 +464,7 @@ def _gen_map_chunks( chunk_inputs = [chunk] map_op = op.copy().reset_key() # force as_index=True for map phase - map_op.output_types = [OutputType.dataframe] + map_op.output_types = op.output_types map_op.groupby_params = map_op.groupby_params.copy() map_op.groupby_params["as_index"] = True if isinstance(map_op.groupby_params["by"], list): @@ -367,21 +482,25 @@ def _gen_map_chunks( map_op.stage = OperandStage.map map_op.pre_funcs = func_infos.pre_funcs map_op.agg_funcs = func_infos.agg_funcs - new_index = chunk.index if len(chunk.index) == 2 else (chunk.index[0], 0) - if op.output_types[0] == OutputType.dataframe: + new_index = chunk.index if len(chunk.index) == 2 else (chunk.index[0],) + if out_df.ndim == 2: + new_index = (new_index[0], 0) if len(new_index) == 1 else new_index map_chunk = map_op.new_chunk( chunk_inputs, shape=out_df.shape, index=new_index, index_value=out_df.index_value, columns_value=out_df.columns_value, + dtypes=out_df.dtypes, ) else: + new_index = new_index[:1] if len(new_index) == 2 else new_index map_chunk = map_op.new_chunk( chunk_inputs, - shape=(out_df.shape[0], 1), + shape=(out_df.shape[0],), index=new_index, index_value=out_df.index_value, + dtype=out_df.dtype, ) map_chunks.append(map_chunk) return map_chunks @@ -422,7 +541,96 @@ def _tile_with_shuffle( ): # First, perform groupby and aggregation on each chunk. agg_chunks = cls._gen_map_chunks(op, in_df.chunks, out_df, func_infos) - return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos) + pivot_chunk = None + if op.groupby_params["sort"] and len(in_df.chunks) > 1: + agg_chunk_len = len(agg_chunks) + sample_chunks = cls._sample_chunks(op, agg_chunks) + pivot_chunk = cls._gen_pivot_chunk(op, sample_chunks, agg_chunk_len) + + return cls._perform_shuffle( + op, agg_chunks, in_df, out_df, func_infos, pivot_chunk + ) + + @classmethod + def _gen_pivot_chunk( + cls, + op: "DataFrameGroupByAgg", + sample_chunks: List[ChunkType], + agg_chunk_len: int, + ): + + properties = dict( + by=op.groupby_params["by"], + gpu=op.is_gpu(), + ) + + # stage 2: gather and merge samples, choose and broadcast p-1 pivots + kind = "quicksort" + output_types = [OutputType.tensor] + + concat_pivot_op = DataFrameGroupbyConcatPivot( + kind=kind, + n_partition=agg_chunk_len, + output_types=output_types, + **properties, + ) + + concat_pivot_chunk = concat_pivot_op.new_chunk( + sample_chunks, + shape=(agg_chunk_len,), + dtype=object, + ) + return concat_pivot_chunk + + @classmethod + def _sample_chunks( + cls, + op: "DataFrameGroupByAgg", + agg_chunks: List[ChunkType], + ): + chunk_shape = len(agg_chunks) + sampled_chunks = [] + + properties = dict( + by=op.groupby_params["by"], + gpu=op.is_gpu(), + ) + + for i, chunk in enumerate(agg_chunks): + kws = [] + sampled_shape = ( + (chunk_shape, chunk.shape[1]) if chunk.ndim == 2 else (chunk_shape,) + ) + chunk_index = (i, 0) if chunk.ndim == 2 else (i,) + chunk_op = DataFramePSRSGroupbySample( + kind="quicksort", + n_partition=chunk_shape, + output_types=op.output_types, + **properties, + ) + if op.output_types[0] == OutputType.dataframe: + kws.append( + { + "shape": sampled_shape, + "index_value": chunk.index_value, + "index": chunk_index, + "type": "regular_sampled", + } + ) + else: + kws.append( + { + "shape": sampled_shape, + "index_value": chunk.index_value, + "index": chunk_index, + "type": "regular_sampled", + "dtype": chunk.dtype, + } + ) + chunk = chunk_op.new_chunk([chunk], kws=kws) + sampled_chunks.append(chunk) + + return sampled_chunks @classmethod def _perform_shuffle( @@ -432,9 +640,15 @@ def _perform_shuffle( in_df: TileableType, out_df: TileableType, func_infos: ReductionSteps, + pivot_chunk: ChunkType, ): # Shuffle the aggregation chunk. - reduce_chunks = cls._gen_shuffle_chunks(op, in_df, agg_chunks) + if pivot_chunk is not None: + reduce_chunks = cls._gen_shuffle_chunks_with_pivot( + op, in_df, agg_chunks, pivot_chunk + ) + else: + reduce_chunks = cls._gen_shuffle_chunks(op, in_df, agg_chunks) # Combine groups agg_chunks = [] @@ -505,14 +719,17 @@ def _combine_tree( if len(chks) == 1: chk = chks[0] else: - concat_op = DataFrameConcat(output_types=[OutputType.dataframe]) + concat_op = DataFrameConcat(output_types=out_df.op.output_types) # Change index for concatenate for j, c in enumerate(chks): c._index = (j, 0) - chk = concat_op.new_chunk(chks, dtypes=chks[0].dtypes) + if out_df.ndim == 2: + chk = concat_op.new_chunk(chks, dtypes=chks[0].dtypes) + else: + chk = concat_op.new_chunk(chks, dtype=chunks[0].dtype) chunk_op = op.copy().reset_key() chunk_op.tileable_op_key = None - chunk_op.output_types = [OutputType.dataframe] + chunk_op.output_types = out_df.op.output_types chunk_op.stage = OperandStage.combine chunk_op.groupby_params = chunk_op.groupby_params.copy() chunk_op.groupby_params.pop("selection", None) @@ -536,8 +753,11 @@ def _combine_tree( ) chunks = new_chunks - concat_op = DataFrameConcat(output_types=[OutputType.dataframe]) - chk = concat_op.new_chunk(chunks, dtypes=chunks[0].dtypes) + concat_op = DataFrameConcat(output_types=out_df.op.output_types) + if out_df.ndim == 2: + chk = concat_op.new_chunk(chunks, dtypes=chunks[0].dtypes) + else: + chk = concat_op.new_chunk(chunks, dtype=chunks[0].dtype) chunk_op = op.copy().reset_key() chunk_op.tileable_op_key = op.key chunk_op.stage = OperandStage.agg @@ -621,9 +841,15 @@ def _tile_auto( return cls._combine_tree(op, chunks + left_chunks, out_df, func_infos) else: # otherwise, use shuffle + pivot_chunk = None + if op.groupby_params["sort"] and len(in_df.chunks) > 1: + agg_chunk_len = len(chunks + left_chunks) + sample_chunks = cls._sample_chunks(op, chunks + left_chunks) + pivot_chunk = cls._gen_pivot_chunk(op, sample_chunks, agg_chunk_len) + logger.debug("Choose shuffle method for groupby operand %s", op) return cls._perform_shuffle( - op, chunks + left_chunks, in_df, out_df, func_infos + op, chunks + left_chunks, in_df, out_df, func_infos, pivot_chunk ) @classmethod @@ -671,8 +897,6 @@ def _get_grouped(cls, op: "DataFrameGroupByAgg", df, ctx, copy=False, grouper=No if op.stage == OperandStage.agg: grouped = df.groupby(**params) else: - # for the intermediate phases, do not sort - params["sort"] = False grouped = df.groupby(**params) if selection is not None: diff --git a/mars/dataframe/groupby/sort.py b/mars/dataframe/groupby/sort.py new file mode 100644 index 0000000000..d0d068e1f8 --- /dev/null +++ b/mars/dataframe/groupby/sort.py @@ -0,0 +1,172 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import pandas as pd + +from ... import opcodes as OperandDef +from ...core import OutputType +from ...core.operand import MapReduceOperand, OperandStage +from ...serialization.serializables import ( + Int32Field, + ListField, +) +from ...utils import ( + lazy_import, +) +from ..operands import DataFrameOperandMixin +from ..sort.psrs import DataFramePSRSChunkOperand + +cudf = lazy_import("cudf", globals=globals()) + + +def _series_to_df(in_series, xdf): + in_df = in_series.to_frame() + if in_series.name is not None: + in_df.columns = xdf.Index([in_series.name]) + return in_df + + +class DataFrameGroupbyConcatPivot(DataFramePSRSChunkOperand, DataFrameOperandMixin): + _op_type_ = OperandDef.GROUPBY_SORT_PIVOT + + @property + def output_limit(self): + return 1 + + @classmethod + def execute(cls, ctx, op: "DataFrameGroupbyConcatPivot"): + inputs = [ctx[c.key] for c in op.inputs if len(ctx[c.key]) > 0] + + xdf = pd if isinstance(inputs[0], (pd.DataFrame, pd.Series)) else cudf + + a = xdf.concat(inputs, axis=0) + a = a.sort_index() + index = a.index.drop_duplicates() + + p = len(inputs) + if len(index) < p: + num = p // len(index) + 1 + index = index.append([index] * (num - 1)) + + index = index.sort_values() + + values = index.values + + slc = np.linspace( + p - 1, len(index) - 1, num=len(op.inputs) - 1, endpoint=False + ).astype(int) + out = values[slc] + ctx[op.outputs[-1].key] = out + + +class DataFramePSRSGroupbySample(DataFramePSRSChunkOperand, DataFrameOperandMixin): + _op_type_ = OperandDef.GROUPBY_SORT_REGULAR_SAMPLE + + @property + def output_limit(self): + return 1 + + @classmethod + def execute(cls, ctx, op: "DataFramePSRSGroupbySample"): + a = ctx[op.inputs[0].key][0] + xdf = pd if isinstance(a, (pd.DataFrame, pd.Series)) else cudf + if isinstance(a, xdf.Series) and op.output_types[0] == OutputType.dataframe: + a = _series_to_df(a, xdf) + + n = op.n_partition + if a.shape[0] < n: + num = n // a.shape[0] + 1 + a = xdf.concat([a] * num).sort_index() + + w = a.shape[0] * 1.0 / (n + 1) + + slc = np.linspace(max(w - 1, 0), a.shape[0] - 1, num=n, endpoint=False).astype( + int + ) + + out = a.iloc[slc] + if op.output_types[0] == OutputType.series and out.ndim == 2: + assert out.shape[1] == 1 + out = out.iloc[:, 0] + ctx[op.outputs[-1].key] = out + + +class DataFrameGroupbySortShuffle(MapReduceOperand, DataFrameOperandMixin): + _op_type_ = OperandDef.GROUPBY_SORT_SHUFFLE + + # for shuffle map + by = ListField("by") + n_partition = Int32Field("n_partition") + + def __init__(self, output_types=None, **kw): + super().__init__(_output_types=output_types, **kw) + + @property + def output_limit(self): + return 1 + + @classmethod + def _execute_map(cls, ctx, op: "DataFrameGroupbySortShuffle"): + df, pivots = [ctx[c.key] for c in op.inputs] + out = op.outputs[0] + + def _get_out_df(p_index, in_df): + if p_index == 0: + out_df = in_df.loc[: pivots[p_index]] + elif p_index == op.n_partition - 1: + out_df = in_df.loc[pivots[p_index - 1] :].drop( + index=pivots[p_index - 1], errors="ignore" + ) + else: + out_df = in_df.loc[pivots[p_index - 1] : pivots[p_index]].drop( + index=pivots[p_index - 1], errors="ignore" + ) + return out_df + + for i in range(op.n_partition): + index = (i, 0) + out_df = tuple(_get_out_df(i, x) for x in df) + ctx[out.key, index] = out_df + + @classmethod + def _execute_reduce(cls, ctx, op: "DataFrameGroupbySortShuffle"): + raw_inputs = list(op.iter_mapper_data(ctx, pop=False)) + by = op.by + xdf = cudf if op.gpu else pd + + r = [] + + tuple_len = len(raw_inputs[0]) + for i in range(tuple_len): + r.append(xdf.concat([inp[i] for inp in raw_inputs], axis=0)) + r = tuple(r) + + ctx[op.outputs[0].key] = r + (by,) + + @classmethod + def estimate_size(cls, ctx, op: "DataFrameGroupbySortShuffle"): + super().estimate_size(ctx, op) + result = ctx[op.outputs[0].key] + if op.stage == OperandStage.reduce: + ctx[op.outputs[0].key] = (result[0], result[1] * 1.5) + else: + ctx[op.outputs[0].key] = result + + @classmethod + def execute(cls, ctx, op: "DataFrameGroupbySortShuffle"): + if op.stage == OperandStage.map: + cls._execute_map(ctx, op) + else: + cls._execute_reduce(ctx, op) diff --git a/mars/dataframe/groupby/tests/test_groupby.py b/mars/dataframe/groupby/tests/test_groupby.py index 50a5041da5..3c05d16595 100644 --- a/mars/dataframe/groupby/tests/test_groupby.py +++ b/mars/dataframe/groupby/tests/test_groupby.py @@ -18,6 +18,7 @@ import pandas as pd import pytest +from ..sort import DataFrameGroupbySortShuffle from .... import dataframe as md from .... import opcodes from ....core import OutputType, tile @@ -118,7 +119,7 @@ def test_groupby_agg(): } ) mdf = md.DataFrame(df, chunk_size=2) - r = mdf.groupby("c2").sum(method="shuffle") + r = mdf.groupby("c2", sort=False).sum(method="shuffle") assert isinstance(r.op, DataFrameGroupByAgg) assert isinstance(r, DataFrame) @@ -139,6 +140,29 @@ def test_groupby_agg(): agg_chunk = chunk.inputs[0].inputs[0].inputs[0].inputs[0] assert agg_chunk.op.stage == OperandStage.map + r = mdf.groupby( + "c2", + ).sum(method="shuffle") + + assert isinstance(r.op, DataFrameGroupByAgg) + assert isinstance(r, DataFrame) + + r = tile(r) + assert len(r.chunks) == 5 + for chunk in r.chunks: + assert isinstance(chunk.op, DataFrameGroupByAgg) + assert chunk.op.stage == OperandStage.agg + assert isinstance(chunk.inputs[0].op, DataFrameGroupbySortShuffle) + assert chunk.inputs[0].op.stage == OperandStage.reduce + assert isinstance(chunk.inputs[0].inputs[0].op, DataFrameShuffleProxy) + assert isinstance( + chunk.inputs[0].inputs[0].inputs[0].op, DataFrameGroupbySortShuffle + ) + assert chunk.inputs[0].inputs[0].inputs[0].op.stage == OperandStage.map + + agg_chunk = chunk.inputs[0].inputs[0].inputs[0].inputs[0] + assert agg_chunk.op.stage == OperandStage.map + # test unknown method with pytest.raises(ValueError): mdf.groupby("c2").sum(method="not_exist") diff --git a/mars/dataframe/groupby/tests/test_groupby_execution.py b/mars/dataframe/groupby/tests/test_groupby_execution.py index c1208e2194..57ce84db43 100644 --- a/mars/dataframe/groupby/tests/test_groupby_execution.py +++ b/mars/dataframe/groupby/tests/test_groupby_execution.py @@ -484,6 +484,100 @@ def test_dataframe_groupby_agg(setup): ) +def test_dataframe_groupby_agg_sort(setup): + agg_funs = [ + "std", + "mean", + "var", + "max", + "count", + "size", + "all", + "any", + "skew", + "kurt", + "sem", + ] + + rs = np.random.RandomState(0) + raw = pd.DataFrame( + { + "c1": np.arange(100).astype(np.int64), + "c2": rs.choice(["a", "b", "c"], (100,)), + "c3": rs.rand(100), + } + ) + mdf = md.DataFrame(raw, chunk_size=13) + + for method in ["tree", "shuffle"]: + r = mdf.groupby("c2").agg("size", method=method) + pd.testing.assert_series_equal( + r.execute().fetch(), raw.groupby("c2").agg("size") + ) + + for agg_fun in agg_funs: + if agg_fun == "size": + continue + r = mdf.groupby("c2").agg(agg_fun, method=method) + pd.testing.assert_frame_equal( + r.execute().fetch(), + raw.groupby("c2").agg(agg_fun), + ) + + r = mdf.groupby("c2").agg(agg_funs, method=method) + pd.testing.assert_frame_equal( + r.execute().fetch(), + raw.groupby("c2").agg(agg_funs), + ) + + agg = OrderedDict([("c1", ["min", "mean"]), ("c3", "std")]) + r = mdf.groupby("c2").agg(agg, method=method) + pd.testing.assert_frame_equal(r.execute().fetch(), raw.groupby("c2").agg(agg)) + + agg = OrderedDict([("c1", "min"), ("c3", "sum")]) + r = mdf.groupby("c2").agg(agg, method=method) + pd.testing.assert_frame_equal(r.execute().fetch(), raw.groupby("c2").agg(agg)) + + r = mdf.groupby("c2").agg({"c1": "min", "c3": "min"}, method=method) + pd.testing.assert_frame_equal( + r.execute().fetch(), + raw.groupby("c2").agg({"c1": "min", "c3": "min"}), + ) + + r = mdf.groupby("c2").agg({"c1": "min"}, method=method) + pd.testing.assert_frame_equal( + r.execute().fetch(), + raw.groupby("c2").agg({"c1": "min"}), + ) + + # test groupby series + r = mdf.groupby(mdf["c2"]).sum(method=method) + pd.testing.assert_frame_equal(r.execute().fetch(), raw.groupby(raw["c2"]).sum()) + + r = mdf.groupby("c2").size(method="tree") + pd.testing.assert_series_equal(r.execute().fetch(), raw.groupby("c2").size()) + + # test inserted kurt method + r = mdf.groupby("c2").kurtosis(method="tree") + pd.testing.assert_frame_equal(r.execute().fetch(), raw.groupby("c2").kurtosis()) + + for agg_fun in agg_funs: + if agg_fun == "size" or callable(agg_fun): + continue + r = getattr(mdf.groupby("c2"), agg_fun)(method="tree") + pd.testing.assert_frame_equal( + r.execute().fetch(), getattr(raw.groupby("c2"), agg_fun)() + ) + + # test as_index=False takes no effect + r = mdf.groupby(["c1", "c2"], as_index=False).agg(["mean", "count"]) + pd.testing.assert_frame_equal( + r.execute().fetch(), + raw.groupby(["c1", "c2"], as_index=False).agg(["mean", "count"]), + ) + assert r.op.groupby_params["as_index"] is True + + def test_series_groupby_agg(setup): rs = np.random.RandomState(0) series1 = pd.Series(rs.rand(10)) diff --git a/mars/opcodes.py b/mars/opcodes.py index b6cb0fdb43..561a2359d7 100644 --- a/mars/opcodes.py +++ b/mars/opcodes.py @@ -426,6 +426,9 @@ GROUPBY_CONCAT = 2034 GROUPBY_HEAD = 2035 GROUPBY_SAMPLE_ILOC = 2036 +GROUPBY_SORT_REGULAR_SAMPLE = 2037 +GROUPBY_SORT_PIVOT = 2038 +GROUPBY_SORT_SHUFFLE = 2039 # parallel sorting by regular sampling PSRS_SORT_REGULAR_SMAPLE = 2040 diff --git a/mars/serialization/serializables/core.py b/mars/serialization/serializables/core.py index cee72105db..e11e1245cf 100644 --- a/mars/serialization/serializables/core.py +++ b/mars/serialization/serializables/core.py @@ -124,7 +124,7 @@ class Serializable(metaclass=SerializableMeta): def __init__(self, *args, **kwargs): if args: # pragma: no cover - values = dict(zip(self.__slots__, args)) + values = dict(zip(self._FIELDS, args)) values.update(kwargs) else: values = kwargs