Skip to content

Commit

Permalink
[BACKPORT] Support sort=True for Groupby (mars-project#2959) (mars-…
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi authored May 23, 2022
1 parent 7b8068c commit f95ea23
Show file tree
Hide file tree
Showing 6 changed files with 534 additions and 17 deletions.
254 changes: 239 additions & 15 deletions mars/dataframe/groupby/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
from ..reduction.aggregation import is_funcs_aggregate, normalize_reduction_funcs
from ..utils import parse_index, build_concatenated_rows_frame, is_cudf
from .core import DataFrameGroupByOperand
from .sort import (
DataFramePSRSGroupbySample,
DataFrameGroupbyConcatPivot,
DataFrameGroupbySortShuffle,
)

cp = lazy_import("cupy", rename="cp")
cudf = lazy_import("cudf")
Expand Down Expand Up @@ -293,6 +298,117 @@ def __call__(self, groupby):
else:
return self._call_series(groupby, df)

@classmethod
def partition_merge_data(
cls,
op: "DataFrameGroupByAgg",
partition_chunks: List[ChunkType],
proxy_chunk: ChunkType,
):
# stage 4: all *ith* classes are gathered and merged
partition_sort_chunks = []
properties = dict(by=op.groupby_params["by"], gpu=op.is_gpu())
out_df = op.outputs[0]

for i, partition_chunk in enumerate(partition_chunks):
output_types = (
[OutputType.dataframe_groupby]
if out_df.ndim == 2
else [OutputType.series_groupby]
)
partition_shuffle_reduce = DataFrameGroupbySortShuffle(
stage=OperandStage.reduce,
reducer_index=(i, 0),
output_types=output_types,
**properties,
)
chunk_shape = list(partition_chunk.shape)
chunk_shape[0] = np.nan

kw = dict(
shape=tuple(chunk_shape),
index=partition_chunk.index,
index_value=partition_chunk.index_value,
)
if op.outputs[0].ndim == 2:
kw.update(
dict(
columns_value=partition_chunk.columns_value,
dtypes=partition_chunk.dtypes,
)
)
else:
kw.update(dict(dtype=partition_chunk.dtype, name=partition_chunk.name))
cs = partition_shuffle_reduce.new_chunks([proxy_chunk], **kw)
partition_sort_chunks.append(cs[0])
return partition_sort_chunks

@classmethod
def partition_local_data(
cls,
op: "DataFrameGroupByAgg",
sorted_chunks: List[ChunkType],
concat_pivot_chunk: ChunkType,
in_df: TileableType,
):
# properties = dict(by=op.groupby_params["by"], gpu=op.is_gpu())
out_df = op.outputs[0]
map_chunks = []
chunk_shape = (in_df.chunk_shape[0], 1)
for chunk in sorted_chunks:
chunk_inputs = [chunk, concat_pivot_chunk]
output_types = (
[OutputType.dataframe_groupby]
if out_df.ndim == 2
else [OutputType.series_groupby]
)
map_chunk_op = DataFrameGroupbySortShuffle(
shuffle_size=chunk_shape[0],
stage=OperandStage.map,
n_partition=len(sorted_chunks),
output_types=output_types,
)
kw = dict()
if out_df.ndim == 2:
kw.update(
dict(
columns_value=chunk_inputs[0].columns_value,
dtypes=chunk_inputs[0].dtypes,
)
)
else:
kw.update(dict(dtype=chunk_inputs[0].dtype, name=chunk_inputs[0].name))

map_chunks.append(
map_chunk_op.new_chunk(
chunk_inputs,
shape=chunk_shape,
index=chunk.index,
index_value=chunk_inputs[0].index_value,
# **kw
)
)

return map_chunks

@classmethod
def _gen_shuffle_chunks_with_pivot(
cls,
op: "DataFrameGroupByAgg",
in_df: TileableType,
chunks: List[ChunkType],
pivot: ChunkType,
):
map_chunks = cls.partition_local_data(op, chunks, pivot, in_df)

proxy_chunk = DataFrameShuffleProxy(
output_types=[OutputType.dataframe]
).new_chunk(map_chunks, shape=())

partition_sort_chunks = cls.partition_merge_data(op, map_chunks, proxy_chunk)

return partition_sort_chunks

@classmethod
def _gen_shuffle_chunks(cls, op, in_df, chunks):
# generate map chunks
Expand Down Expand Up @@ -333,7 +449,6 @@ def _gen_shuffle_chunks(cls, op, in_df, chunks):
index_value=None,
)
)

return reduce_chunks

@classmethod
Expand All @@ -349,7 +464,7 @@ def _gen_map_chunks(
chunk_inputs = [chunk]
map_op = op.copy().reset_key()
# force as_index=True for map phase
map_op.output_types = [OutputType.dataframe]
map_op.output_types = op.output_types
map_op.groupby_params = map_op.groupby_params.copy()
map_op.groupby_params["as_index"] = True
if isinstance(map_op.groupby_params["by"], list):
Expand All @@ -367,21 +482,25 @@ def _gen_map_chunks(
map_op.stage = OperandStage.map
map_op.pre_funcs = func_infos.pre_funcs
map_op.agg_funcs = func_infos.agg_funcs
new_index = chunk.index if len(chunk.index) == 2 else (chunk.index[0], 0)
if op.output_types[0] == OutputType.dataframe:
new_index = chunk.index if len(chunk.index) == 2 else (chunk.index[0],)
if out_df.ndim == 2:
new_index = (new_index[0], 0) if len(new_index) == 1 else new_index
map_chunk = map_op.new_chunk(
chunk_inputs,
shape=out_df.shape,
index=new_index,
index_value=out_df.index_value,
columns_value=out_df.columns_value,
dtypes=out_df.dtypes,
)
else:
new_index = new_index[:1] if len(new_index) == 2 else new_index
map_chunk = map_op.new_chunk(
chunk_inputs,
shape=(out_df.shape[0], 1),
shape=(out_df.shape[0],),
index=new_index,
index_value=out_df.index_value,
dtype=out_df.dtype,
)
map_chunks.append(map_chunk)
return map_chunks
Expand Down Expand Up @@ -422,7 +541,96 @@ def _tile_with_shuffle(
):
# First, perform groupby and aggregation on each chunk.
agg_chunks = cls._gen_map_chunks(op, in_df.chunks, out_df, func_infos)
return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos)
pivot_chunk = None
if op.groupby_params["sort"] and len(in_df.chunks) > 1:
agg_chunk_len = len(agg_chunks)
sample_chunks = cls._sample_chunks(op, agg_chunks)
pivot_chunk = cls._gen_pivot_chunk(op, sample_chunks, agg_chunk_len)

return cls._perform_shuffle(
op, agg_chunks, in_df, out_df, func_infos, pivot_chunk
)

@classmethod
def _gen_pivot_chunk(
cls,
op: "DataFrameGroupByAgg",
sample_chunks: List[ChunkType],
agg_chunk_len: int,
):

properties = dict(
by=op.groupby_params["by"],
gpu=op.is_gpu(),
)

# stage 2: gather and merge samples, choose and broadcast p-1 pivots
kind = "quicksort"
output_types = [OutputType.tensor]

concat_pivot_op = DataFrameGroupbyConcatPivot(
kind=kind,
n_partition=agg_chunk_len,
output_types=output_types,
**properties,
)

concat_pivot_chunk = concat_pivot_op.new_chunk(
sample_chunks,
shape=(agg_chunk_len,),
dtype=object,
)
return concat_pivot_chunk

@classmethod
def _sample_chunks(
cls,
op: "DataFrameGroupByAgg",
agg_chunks: List[ChunkType],
):
chunk_shape = len(agg_chunks)
sampled_chunks = []

properties = dict(
by=op.groupby_params["by"],
gpu=op.is_gpu(),
)

for i, chunk in enumerate(agg_chunks):
kws = []
sampled_shape = (
(chunk_shape, chunk.shape[1]) if chunk.ndim == 2 else (chunk_shape,)
)
chunk_index = (i, 0) if chunk.ndim == 2 else (i,)
chunk_op = DataFramePSRSGroupbySample(
kind="quicksort",
n_partition=chunk_shape,
output_types=op.output_types,
**properties,
)
if op.output_types[0] == OutputType.dataframe:
kws.append(
{
"shape": sampled_shape,
"index_value": chunk.index_value,
"index": chunk_index,
"type": "regular_sampled",
}
)
else:
kws.append(
{
"shape": sampled_shape,
"index_value": chunk.index_value,
"index": chunk_index,
"type": "regular_sampled",
"dtype": chunk.dtype,
}
)
chunk = chunk_op.new_chunk([chunk], kws=kws)
sampled_chunks.append(chunk)

return sampled_chunks

@classmethod
def _perform_shuffle(
Expand All @@ -432,9 +640,15 @@ def _perform_shuffle(
in_df: TileableType,
out_df: TileableType,
func_infos: ReductionSteps,
pivot_chunk: ChunkType,
):
# Shuffle the aggregation chunk.
reduce_chunks = cls._gen_shuffle_chunks(op, in_df, agg_chunks)
if pivot_chunk is not None:
reduce_chunks = cls._gen_shuffle_chunks_with_pivot(
op, in_df, agg_chunks, pivot_chunk
)
else:
reduce_chunks = cls._gen_shuffle_chunks(op, in_df, agg_chunks)

# Combine groups
agg_chunks = []
Expand Down Expand Up @@ -505,14 +719,17 @@ def _combine_tree(
if len(chks) == 1:
chk = chks[0]
else:
concat_op = DataFrameConcat(output_types=[OutputType.dataframe])
concat_op = DataFrameConcat(output_types=out_df.op.output_types)
# Change index for concatenate
for j, c in enumerate(chks):
c._index = (j, 0)
chk = concat_op.new_chunk(chks, dtypes=chks[0].dtypes)
if out_df.ndim == 2:
chk = concat_op.new_chunk(chks, dtypes=chks[0].dtypes)
else:
chk = concat_op.new_chunk(chks, dtype=chunks[0].dtype)
chunk_op = op.copy().reset_key()
chunk_op.tileable_op_key = None
chunk_op.output_types = [OutputType.dataframe]
chunk_op.output_types = out_df.op.output_types
chunk_op.stage = OperandStage.combine
chunk_op.groupby_params = chunk_op.groupby_params.copy()
chunk_op.groupby_params.pop("selection", None)
Expand All @@ -536,8 +753,11 @@ def _combine_tree(
)
chunks = new_chunks

concat_op = DataFrameConcat(output_types=[OutputType.dataframe])
chk = concat_op.new_chunk(chunks, dtypes=chunks[0].dtypes)
concat_op = DataFrameConcat(output_types=out_df.op.output_types)
if out_df.ndim == 2:
chk = concat_op.new_chunk(chunks, dtypes=chunks[0].dtypes)
else:
chk = concat_op.new_chunk(chunks, dtype=chunks[0].dtype)
chunk_op = op.copy().reset_key()
chunk_op.tileable_op_key = op.key
chunk_op.stage = OperandStage.agg
Expand Down Expand Up @@ -621,9 +841,15 @@ def _tile_auto(
return cls._combine_tree(op, chunks + left_chunks, out_df, func_infos)
else:
# otherwise, use shuffle
pivot_chunk = None
if op.groupby_params["sort"] and len(in_df.chunks) > 1:
agg_chunk_len = len(chunks + left_chunks)
sample_chunks = cls._sample_chunks(op, chunks + left_chunks)
pivot_chunk = cls._gen_pivot_chunk(op, sample_chunks, agg_chunk_len)

logger.debug("Choose shuffle method for groupby operand %s", op)
return cls._perform_shuffle(
op, chunks + left_chunks, in_df, out_df, func_infos
op, chunks + left_chunks, in_df, out_df, func_infos, pivot_chunk
)

@classmethod
Expand Down Expand Up @@ -671,8 +897,6 @@ def _get_grouped(cls, op: "DataFrameGroupByAgg", df, ctx, copy=False, grouper=No
if op.stage == OperandStage.agg:
grouped = df.groupby(**params)
else:
# for the intermediate phases, do not sort
params["sort"] = False
grouped = df.groupby(**params)

if selection is not None:
Expand Down
Loading

0 comments on commit f95ea23

Please sign in to comment.