Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle both sides at the same time for md.merge #3041

Merged
merged 3 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 122 additions & 53 deletions mars/dataframe/merge/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
NamedTupleField,
)
from ...typing import TileableType
from ...utils import has_unknown_shape
from ...utils import has_unknown_shape, lazy_import
from ..base.bloom_filter import filter_by_bloom_filter
from ..core import DataFrame, Series
from ..core import DataFrame, Series, DataFrameChunk
from ..operands import DataFrameOperand, DataFrameOperandMixin, DataFrameShuffleProxy
from ..utils import (
auto_merge_chunks,
Expand All @@ -46,6 +46,7 @@
parse_index,
hash_dataframe_on,
infer_index_value,
is_cudf,
)

import logging
Expand All @@ -63,34 +64,29 @@
BLOOM_FILTER_ON_OPTIONS = ["large", "small", "both"]
DEFAULT_BLOOM_FILTER_ON = "large"

cudf = lazy_import("cudf")


class DataFrameMergeAlign(MapReduceOperand, DataFrameOperandMixin):
_op_type_ = OperandDef.DATAFRAME_SHUFFLE_MERGE_ALIGN

_index_shuffle_size = Int32Field("index_shuffle_size")
_shuffle_on = AnyField("shuffle_on")

_input = KeyField("input")
index_shuffle_size = Int32Field("index_shuffle_size")
shuffle_on = AnyField("shuffle_on")

def __init__(self, index_shuffle_size=None, shuffle_on=None, **kw):
super().__init__(
_index_shuffle_size=index_shuffle_size,
_shuffle_on=shuffle_on,
_output_types=[OutputType.dataframe],
**kw,
)
input = KeyField("input")

@property
def index_shuffle_size(self):
return self._index_shuffle_size
def __init__(self, output_types=None, **kw):
super().__init__(_output_types=output_types, **kw)
if output_types is None:
if self.stage == OperandStage.map:
output_types = [OutputType.dataframe]
elif self.stage == OperandStage.reduce:
output_types = [OutputType.dataframe] * 2
self._output_types = output_types

@property
def shuffle_on(self):
return self._shuffle_on

def _set_inputs(self, inputs):
super()._set_inputs(inputs)
self._input = self._inputs[0]
def output_limit(self) -> int:
return len(self.output_types)

@classmethod
def execute_map(cls, ctx, op):
Expand Down Expand Up @@ -123,16 +119,18 @@ def execute_map(cls, ctx, op):

@classmethod
def execute_reduce(cls, ctx, op: "DataFrameMergeAlign"):
chunk = op.outputs[0]
input_idx_to_df = dict(op.iter_mapper_data_with_index(ctx, skip_none=True))
row_idxes = sorted({idx[0] for idx in input_idx_to_df})

res = []
for row_idx in row_idxes:
row_df = input_idx_to_df.get((row_idx, 0), None)
if row_df is not None:
res.append(row_df)
ctx[chunk.key] = pd.concat(res, axis=0)
for i, chunk in enumerate(op.outputs):
input_idx_to_df = dict(
op.iter_mapper_data_with_index(ctx, mapper_id=i, skip_none=True)
)
row_idxes = sorted({idx[0] for idx in input_idx_to_df})
res = []
for row_idx in row_idxes:
row_df = input_idx_to_df.get((row_idx, 0), None)
if row_df is not None:
res.append(row_df)
xdf = cudf if is_cudf(res[0]) else pd
ctx[chunk.key] = xdf.concat(res, axis=0)

@classmethod
def execute(cls, ctx, op):
Expand Down Expand Up @@ -213,6 +211,30 @@ def __call__(self, left, right):
columns_value=parse_index(merged.columns, store_data=True),
)

@classmethod
def _gen_map_chunk(
cls,
chunk: DataFrameChunk,
shuffle_on: Union[List, str],
out_size: int,
mapper_id: int = 0,
):
map_op = DataFrameMergeAlign(
stage=OperandStage.map,
shuffle_on=shuffle_on,
sparse=chunk.issparse(),
mapper_id=mapper_id,
index_shuffle_size=out_size,
)
return map_op.new_chunk(
[chunk],
shape=(np.nan, np.nan),
dtypes=chunk.dtypes,
index=chunk.index,
index_value=chunk.index_value,
columns_value=chunk.columns_value,
)

@classmethod
def _gen_shuffle_chunks(
cls,
Expand All @@ -221,24 +243,9 @@ def _gen_shuffle_chunks(
df: Union[DataFrame, Series],
):
# gen map chunks
map_chunks = []
for chunk in df.chunks:
map_op = DataFrameMergeAlign(
stage=OperandStage.map,
shuffle_on=shuffle_on,
sparse=chunk.issparse(),
index_shuffle_size=out_shape[0],
)
map_chunks.append(
map_op.new_chunk(
[chunk],
shape=(np.nan, np.nan),
dtypes=chunk.dtypes,
index=chunk.index,
index_value=chunk.index_value,
columns_value=chunk.columns_value,
)
)
map_chunks = [
cls._gen_map_chunk(chunk, shuffle_on, out_shape[0]) for chunk in df.chunks
]

proxy_chunk = DataFrameShuffleProxy(
output_types=[OutputType.dataframe]
Expand All @@ -254,7 +261,9 @@ def _gen_shuffle_chunks(
reduce_chunks = []
for out_idx in itertools.product(*(range(s) for s in out_shape)):
reduce_op = DataFrameMergeAlign(
stage=OperandStage.reduce, sparse=proxy_chunk.issparse()
stage=OperandStage.reduce,
sparse=proxy_chunk.issparse(),
output_types=[OutputType.dataframe],
)
reduce_chunks.append(
reduce_op.new_chunk(
Expand All @@ -268,6 +277,65 @@ def _gen_shuffle_chunks(
)
return reduce_chunks

@classmethod
def _gen_both_shuffle_chunks(
cls,
out_shape: Tuple,
left_shuffle_on: Union[List, str],
right_shuffle_on: Union[List, str],
left: Union[DataFrame, Series],
right: Union[DataFrame, Series],
):
# gen map chunks
# for left dataframe, use 0 as mapper_id
left_map_chunks = [
cls._gen_map_chunk(chunk, left_shuffle_on, out_shape[0], mapper_id=0)
for chunk in left.chunks
]
# for right dataframe, use 1 as mapper_id
right_map_chunks = [
cls._gen_map_chunk(chunk, right_shuffle_on, out_shape[0], mapper_id=1)
for chunk in right.chunks
]
map_chunks = left_map_chunks + right_map_chunks

proxy_chunk = DataFrameShuffleProxy(
output_types=[OutputType.dataframe]
).new_chunk(
map_chunks,
shape=(),
dtypes=left.dtypes,
index_value=left.index_value,
columns_value=left.columns_value,
)

# gen reduce chunks
left_reduce_chunks = []
right_reduce_chunks = []
for out_idx in itertools.product(*(range(s) for s in out_shape)):
reduce_op = DataFrameMergeAlign(
stage=OperandStage.reduce, sparse=proxy_chunk.issparse()
)
left_param = {
"shape": (np.nan, np.nan),
"dtypes": left.dtypes,
"index": out_idx,
"index_value": left.index_value,
"columns_value": left.columns_value,
}
right_param = {
"shape": (np.nan, np.nan),
"dtypes": right.dtypes,
"index": out_idx,
"index_value": right.index_value,
"columns_value": right.columns_value,
}
params = [left_param, right_param]
left_reduce, right_reduce = reduce_op.new_chunks([proxy_chunk], kws=params)
left_reduce_chunks.append(left_reduce)
right_reduce_chunks.append(right_reduce)
return left_reduce_chunks, right_reduce_chunks

@classmethod
def _apply_bloom_filter(
cls,
Expand Down Expand Up @@ -404,8 +472,9 @@ def _tile_shuffle(
right_on = _prepare_shuffle_on(op.right_index, op.right_on, op.on)

# do shuffle
left_chunks = cls._gen_shuffle_chunks(out_chunk_shape, left_on, left)
right_chunks = cls._gen_shuffle_chunks(out_chunk_shape, right_on, right)
left_chunks, right_chunks = cls._gen_both_shuffle_chunks(
out_chunk_shape, left_on, right_on, left, right
)

out_chunks = []
for left_chunk, right_chunk in zip(left_chunks, right_chunks):
Expand Down
20 changes: 10 additions & 10 deletions mars/dataframe/merge/tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ def test_merge():
assert left.op.stage == OperandStage.reduce
assert isinstance(right.op, DataFrameMergeAlign)
assert right.op.stage == OperandStage.reduce
assert len(left.inputs[0].inputs) == 2
assert len(right.inputs[0].inputs) == 2
for lchunk in left.inputs[0].inputs:
assert len(left.inputs[0].inputs) == 4
assert len(right.inputs[0].inputs) == 4
for lchunk in left.inputs[0].inputs[:2]:
assert isinstance(lchunk.op, DataFrameMergeAlign)
assert lchunk.op.stage == OperandStage.map
assert lchunk.op.index_shuffle_size == 2
assert lchunk.op.shuffle_on == kw.get("on", None) or kw.get(
"left_on", None
)
for rchunk in right.inputs[0].inputs:
for rchunk in right.inputs[0].inputs[2:]:
assert isinstance(rchunk.op, DataFrameMergeAlign)
assert rchunk.op.stage == OperandStage.map
assert rchunk.op.index_shuffle_size == 2
Expand Down Expand Up @@ -127,8 +127,8 @@ def test_join():
assert left.op.stage == OperandStage.reduce
assert isinstance(right.op, DataFrameMergeAlign)
assert right.op.stage == OperandStage.reduce
assert len(left.inputs[0].inputs) == 2
assert len(right.inputs[0].inputs) == 3
assert len(left.inputs[0].inputs) == 5
assert len(right.inputs[0].inputs) == 5
for lchunk in left.inputs[0].inputs:
assert isinstance(lchunk.op, DataFrameMergeAlign)
assert lchunk.op.stage == OperandStage.map
Expand Down Expand Up @@ -175,14 +175,14 @@ def test_join_on():
assert left.op.stage == OperandStage.reduce
assert isinstance(right.op, DataFrameMergeAlign)
assert right.op.stage == OperandStage.reduce
assert len(left.inputs[0].inputs) == 2
assert len(right.inputs[0].inputs) == 3
for lchunk in left.inputs[0].inputs:
assert len(left.inputs[0].inputs) == 5
assert len(right.inputs[0].inputs) == 5
for lchunk in left.inputs[0].inputs[:2]:
assert isinstance(lchunk.op, DataFrameMergeAlign)
assert lchunk.op.stage == OperandStage.map
assert lchunk.op.index_shuffle_size == 3
assert lchunk.op.shuffle_on == kw.get("on", None)
for rchunk in right.inputs[0].inputs:
for rchunk in right.inputs[0].inputs[2:]:
assert isinstance(rchunk.op, DataFrameMergeAlign)
assert rchunk.op.stage == OperandStage.map
assert rchunk.op.index_shuffle_size == 3
Expand Down