Skip to content

Commit

Permalink
Support bloom filtering on both sides of md.merge
Browse files Browse the repository at this point in the history
  • Loading branch information
继盛 committed Apr 26, 2022
1 parent 712d7f1 commit 252322e
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 56 deletions.
186 changes: 131 additions & 55 deletions mars/dataframe/merge/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import itertools
from collections import namedtuple
from enum import Enum
from typing import Dict, List, Optional, Union, Tuple
from typing import Any, Dict, List, Optional, Union, Tuple

import numpy as np
import pandas as pd
Expand All @@ -27,6 +27,7 @@
from ...serialization.serializables import (
AnyField,
BoolField,
DictField,
StringField,
TupleField,
KeyField,
Expand All @@ -50,6 +51,16 @@
import logging

logger = logging.getLogger(__name__)
DEFAULT_BLOOM_FILTER_CHUNK_THRESHOLD = 10
# use bloom filter to filter large DataFrame
BLOOM_FILTER_OPTIONS = [
"max_elements",
"error_rate",
"apply_chunk_size_threshold",
"filter",
]
BLOOM_FILTER_ON_OPTIONS = ["large", "small", "both"]
DEFAULT_BLOOM_FILTER_ON = "large"


class DataFrameMergeAlign(MapReduceOperand, DataFrameOperandMixin):
Expand Down Expand Up @@ -157,6 +168,7 @@ class DataFrameMerge(DataFrameOperand, DataFrameOperandMixin):
auto_merge = StringField("auto_merge")
auto_merge_threshold = Int32Field("auto_merge_threshold")
bloom_filter = AnyField("bloom_filter")
bloom_filter_options = DictField("bloom_filter_options")

# only for broadcast merge
split_info = NamedTupleField("split_info")
Expand Down Expand Up @@ -265,24 +277,40 @@ def _apply_bloom_filter(
op: "DataFrameMerge",
):
bloom_filter_params = dict()
if isinstance(op.bloom_filter, dict):
if "max_elements" in op.bloom_filter:
bloom_filter_params["max_elements"] = op.bloom_filter["max_elements"]
if "error_rate" in op.bloom_filter:
bloom_filter_params["error_rate"] = op.bloom_filter["error_rate"]
bloom_filter_options = op.bloom_filter_options or dict()
if "max_elements" in bloom_filter_options:
bloom_filter_params["max_elements"] = bloom_filter_options["max_elements"]
if "error_rate" in bloom_filter_options:
bloom_filter_params["error_rate"] = bloom_filter_options["error_rate"]
if "max_elements" not in bloom_filter_params:
bloom_filter_params["max_elements"] = max(
c.shape[0] for c in left.chunks + right.chunks
)
if len(left.chunks) > len(right.chunks):
filter_on = bloom_filter_options.get("filter", DEFAULT_BLOOM_FILTER_ON)
if filter_on == "large":
if len(left.chunks) > len(right.chunks):
left = filter_by_bloom_filter(
left, right, left_on, right_on, **bloom_filter_params
)
else:
right = filter_by_bloom_filter(
right, left, right_on, left_on, **bloom_filter_params
)
elif filter_on == "small":
if len(left.chunks) < len(right.chunks):
left = filter_by_bloom_filter(
left, right, left_on, right_on, **bloom_filter_params
)
else:
right = filter_by_bloom_filter(
right, left, right_on, left_on, **bloom_filter_params
)
else:
assert filter_on == "both"
# both
left = filter_by_bloom_filter(
left,
right,
left_on,
right_on,
**bloom_filter_params,
left, right, left_on, right_on, **bloom_filter_params
)
else:
right = filter_by_bloom_filter(
right, left, right_on, left_on, **bloom_filter_params
)
Expand Down Expand Up @@ -587,14 +615,50 @@ def _if_apply_bloom_filter(
op: "DataFrameMerge",
left: TileableType,
right: TileableType,
bloom_filter_chunk_threshold: int,
):
if len(left.chunks + right.chunks) <= bloom_filter_chunk_threshold:
# bloom filter can only work for inner merge
if op.how != "inner" or op.bloom_filter is False:
return False
elif method == MergeMethod.shuffle and op.bloom_filter:
elif op.bloom_filter is True:
return True
else:

bloom_filter_options = op.bloom_filter_options or dict()
bloom_filter_chunk_threshold = bloom_filter_options.get(
"apply_chunk_size_threshold", DEFAULT_BLOOM_FILTER_CHUNK_THRESHOLD
)

# bloom_filter == auto
if len(left.chunks + right.chunks) <= bloom_filter_chunk_threshold:
# if size of input chunks <= threshold, skip bloom filter
return False
elif method == MergeMethod.shuffle:
# for shuffle, enable bloom filter by default
return True
elif method == MergeMethod.broadcast:
# for broadcast, enable bloom filter when filter not specified:
# 1. distributed, n_worker > 1
# 2. small chunks' size is less than all cpu count
ctx = get_context()
n_worker = len(ctx.get_worker_addresses())
all_cpu_count = ctx.get_total_n_cpu()
left_n_chunk = left.chunk_shape[0]
right_n_chunk = right.chunk_shape[0]
bloom_filter_on = bloom_filter_options.get("filter")
if (
bloom_filter_on is None
and n_worker > 1
and min(left_n_chunk, right_n_chunk) < all_cpu_count
):
# check small DataFrame's chunks size,
# if less than all cpu count,
# means that some cpu may be free,
# then do bloom filter
return True
else:
# specified filter, will do bloom filter
return True

return False

@classmethod
def tile(cls, op: "DataFrameMerge"):
Expand All @@ -614,32 +678,24 @@ def tile(cls, op: "DataFrameMerge"):
right = auto_merge_chunks(ctx, right)

method = cls._choose_merge_method(op, left, right)
bloom_filter_chunk_threshold = 10
if isinstance(op.bloom_filter, dict):
bloom_filter_chunk_threshold = op.bloom_filter.pop(
"apply_chunk_size_threshold", bloom_filter_chunk_threshold
)
if cls._if_apply_bloom_filter(
method, op, left, right, bloom_filter_chunk_threshold
):
if cls._if_apply_bloom_filter(method, op, left, right):
if has_unknown_shape(left, right):
yield left.chunks + right.chunks
left_on = _prepare_shuffle_on(op.left_index, op.left_on, op.on)
right_on = _prepare_shuffle_on(op.right_index, op.right_on, op.on)
if op.how == "inner" and op.bloom_filter:
if has_unknown_shape(left, right):
yield left.chunks + right.chunks
small_one = right if len(left.chunks) > len(right.chunks) else left
logger.debug(
"Apply bloom filter for operand %s, use DataFrame %s to build bloom filter.",
op,
small_one,
)
left, right = yield from recursive_tile(
*cls._apply_bloom_filter(left, right, left_on, right_on, op)
)
# auto merge after bloom filter
yield [left, right] + left.chunks + right.chunks
left = auto_merge_chunks(ctx, left)
right = auto_merge_chunks(ctx, right)
small_one = right if len(left.chunks) > len(right.chunks) else left
logger.debug(
"Apply bloom filter for operand %s, use DataFrame %s to build bloom filter.",
op,
small_one,
)
left, right = yield from recursive_tile(
*cls._apply_bloom_filter(left, right, left_on, right_on, op)
)
# auto merge after bloom filter
yield [left, right] + left.chunks + right.chunks
left = auto_merge_chunks(ctx, left)
right = auto_merge_chunks(ctx, right)

if op.method == "auto":
# if method is auto, select new method after auto merge
Expand Down Expand Up @@ -746,7 +802,8 @@ def merge(
method: str = "auto",
auto_merge: str = "both",
auto_merge_threshold: int = 8,
bloom_filter: Union[bool, Dict] = True,
bloom_filter: Union[bool, str] = "auto",
bloom_filter_options: Dict[str, Any] = None,
) -> DataFrame:
"""
Merge DataFrame or named Series objects with a database-style join.
Expand Down Expand Up @@ -839,17 +896,16 @@ def merge(
When how is "inner", merged result could be much smaller than original DataFrame,
if the number of chunks is greater than the threshold,
it will merge small chunks automatically.
bloom_filter: bool or dict, default True
Use bloom filter to optimize merge, you can pass a dict to specify arguments for
bloom filter.
If is a dict:
bloom_filter: bool, str, default "auto"
Use bloom filter to optimize merge
bloom_filter_options: dict
* "max_elements": max elements in bloom filter,
default value is the max size of all input chunks
* "error_rate": error raite, default 0.1.
* "apply_chunk_size_threshold": min chunk size of input chunks to apply bloom filter, default 10
when chunk size of left and right is greater than this threshold, apply bloom filter
* "filter": "large", "small", "both", default "large"
decides to filter on large, small or both DataFrames.
Returns
-------
Expand Down Expand Up @@ -940,8 +996,26 @@ def merge(
raise NotImplementedError(f"{method} merge is not supported")
if auto_merge not in ["both", "none", "before", "after"]: # pragma: no cover
raise ValueError(
f"{auto_merge} can only be `both`, `none`, `before` or `after`"
f"auto_merge can only be `both`, `none`, `before` or `after`, got {auto_merge}"
)
if bloom_filter not in [True, False, "auto"]:
raise ValueError(
f'bloom_filter can only be True, False, or "auto", got {bloom_filter}'
)
if bloom_filter_options:
if not isinstance(bloom_filter_options, dict):
raise TypeError(
f"bloom_filter_options must be a dict, got {type(bloom_filter_options)}"
)
for k, v in bloom_filter_options.items():
if k not in BLOOM_FILTER_OPTIONS:
raise ValueError(
f"Invalid bloom filter option {k}, available: {BLOOM_FILTER_OPTIONS}"
)
if k == "filter" and v not in BLOOM_FILTER_ON_OPTIONS:
raise ValueError(
f"Invalid filter {k}, available: {BLOOM_FILTER_ON_OPTIONS}"
)
op = DataFrameMerge(
how=how,
on=on,
Expand All @@ -958,6 +1032,7 @@ def merge(
auto_merge=auto_merge,
auto_merge_threshold=auto_merge_threshold,
bloom_filter=bloom_filter,
bloom_filter_options=bloom_filter_options,
output_types=[OutputType.dataframe],
)
return op(df, right)
Expand All @@ -975,6 +1050,7 @@ def join(
auto_merge: str = "both",
auto_merge_threshold: int = 8,
bloom_filter: Union[bool, Dict] = True,
bloom_filter_options: Dict[str, Any] = None,
) -> DataFrame:
"""
Join columns of another DataFrame.
Expand Down Expand Up @@ -1029,17 +1105,16 @@ def join(
When how is "inner", merged result could be much smaller than original DataFrame,
if the number of chunks is greater than the threshold,
it will merge small chunks automatically.
bloom_filter: bool or dict, default True
Use bloom filter to optimize merge, you can pass a dict to specify arguments for
bloom filter.
If is a dict:
bloom_filter: bool, str, default "auto"
Use bloom filter to optimize merge
bloom_filter_options: dict
* "max_elements": max elements in bloom filter,
default value is the max size of all input chunks
* "error_rate": error raite, default 0.1.
* "apply_chunk_size_threshold": min chunk size of input chunks to apply bloom filter, default 10
when chunk size of left and right is greater than this threshold, apply bloom filter
* "filter": "large", "small", "both", default "large"
decides to filter on large, small or both DataFrames.
Returns
-------
Expand Down Expand Up @@ -1149,4 +1224,5 @@ def join(
auto_merge=auto_merge,
auto_merge_threshold=auto_merge_threshold,
bloom_filter=bloom_filter,
bloom_filter_options=bloom_filter_options,
)
23 changes: 23 additions & 0 deletions mars/dataframe/merge/tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import numpy as np
import pandas as pd
import pytest

from ....core import tile
from ....core.operand import OperandStage
Expand Down Expand Up @@ -75,6 +76,28 @@ def test_merge():
)


def test_merge_invalid_parameters():
pdf1 = pd.DataFrame(
np.arange(20).reshape((4, 5)) + 1, columns=["a", "b", "c", "d", "e"]
)
pdf2 = pd.DataFrame(np.arange(20).reshape((5, 4)) + 1, columns=["a", "b", "x", "y"])

df1 = from_pandas(pdf1, chunk_size=2)
df2 = from_pandas(pdf2, chunk_size=3)

with pytest.raises(ValueError):
df1.merge(df2, bloom_filter="wrong")

with pytest.raises(TypeError):
df1.merge(df2, bloom_filter_options="wrong")

with pytest.raises(ValueError):
df1.merge(df2, bloom_filter_options={"wrong": 1})

with pytest.raises(ValueError):
df1.merge(df2, bloom_filter_options={"filter": "wrong"})


def test_join():
df1 = pd.DataFrame([[1, 3, 3], [4, 2, 6], [7, 8, 9]], index=["a1", "a2", "a3"])
df2 = pd.DataFrame([[1, 2, 3], [1, 5, 6], [7, 8, 9]], index=["a1", "b2", "b3"]) + 1
Expand Down
Loading

0 comments on commit 252322e

Please sign in to comment.