Skip to content

Commit

Permalink
Disable bloom filter in merge for now (mars-project#2967)
Browse files Browse the repository at this point in the history
(cherry picked from commit 4d64416)
  • Loading branch information
Xuye (Chris) Qin authored and xuye.qin committed May 23, 2022
1 parent f849474 commit 696d95e
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 64 deletions.
2 changes: 2 additions & 0 deletions mars/core/entity/executable.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def _thread_body(self):
fut.set_result(None)
except Exception as ex: # pragma: no cover # noqa: E722 # nosec # pylint: disable=bare-except
fut.set_exception(ex)
finally:
del session

def stop(self):
if self._decref_thread: # pragma: no branch
Expand Down
11 changes: 9 additions & 2 deletions mars/dataframe/base/bloom_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class DataFrameBloomFilter(DataFrameOperand, DataFrameOperandMixin):
# for build
max_elements = Int64Field("max_elements")
error_rate = Float64Field("error_rate")

combine_size = Int64Field("combine_size")
# chunk
execution_stage = StringField("execution_stage", default=None)

def __init__(self, execution_stage=None, **kwargs):
Expand Down Expand Up @@ -71,7 +72,7 @@ def tile(cls, op: "DataFrameBloomFilter"):
chunks.append(build_op.new_chunk(inputs=[c]))

# union all chunk filters
combine_size = options.combine_size
combine_size = op.combine_size
while len(chunks) > combine_size:
new_chunks = []
for i in range(0, len(chunks), combine_size):
Expand Down Expand Up @@ -243,6 +244,7 @@ def filter_by_bloom_filter(
right_on: Union[str, List],
max_elements: int = 10000,
error_rate: float = 0.1,
combine_size: int = None,
):
"""
Use bloom filter to filter DataFrame.
Expand All @@ -261,16 +263,21 @@ def filter_by_bloom_filter(
How many elements you expect the filter to hold.
error_rate: float
error_rate defines accuracy.
combine_size: int
Combine size.
Returns
-------
DataFrame
Filtered df1.
"""
if combine_size is None:
combine_size = options.combine_size
op = DataFrameBloomFilter(
left_on=left_on,
right_on=right_on,
max_elements=max_elements,
error_rate=error_rate,
combine_size=combine_size,
)
return op(df1, df2)
194 changes: 136 additions & 58 deletions mars/dataframe/merge/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import itertools
from collections import namedtuple
from enum import Enum
from typing import Dict, List, Optional, Union, Tuple
from typing import Any, Dict, List, Optional, Union, Tuple

import numpy as np
import pandas as pd
Expand All @@ -27,6 +27,7 @@
from ...serialization.serializables import (
AnyField,
BoolField,
DictField,
StringField,
TupleField,
KeyField,
Expand All @@ -50,6 +51,17 @@
import logging

logger = logging.getLogger(__name__)
DEFAULT_BLOOM_FILTER_CHUNK_THRESHOLD = 10
# use bloom filter to filter large DataFrame
BLOOM_FILTER_OPTIONS = [
"max_elements",
"error_rate",
"apply_chunk_size_threshold",
"filter",
"combine_size",
]
BLOOM_FILTER_ON_OPTIONS = ["large", "small", "both"]
DEFAULT_BLOOM_FILTER_ON = "large"


class DataFrameMergeAlign(MapReduceOperand, DataFrameOperandMixin):
Expand Down Expand Up @@ -157,6 +169,7 @@ class DataFrameMerge(DataFrameOperand, DataFrameOperandMixin):
auto_merge = StringField("auto_merge")
auto_merge_threshold = Int32Field("auto_merge_threshold")
bloom_filter = AnyField("bloom_filter")
bloom_filter_options = DictField("bloom_filter_options")

# only for broadcast merge
split_info = NamedTupleField("split_info")
Expand Down Expand Up @@ -265,24 +278,39 @@ def _apply_bloom_filter(
op: "DataFrameMerge",
):
bloom_filter_params = dict()
if isinstance(op.bloom_filter, dict):
if "max_elements" in op.bloom_filter:
bloom_filter_params["max_elements"] = op.bloom_filter["max_elements"]
if "error_rate" in op.bloom_filter:
bloom_filter_params["error_rate"] = op.bloom_filter["error_rate"]
bloom_filter_options = op.bloom_filter_options or dict()
for option in ["max_elements", "error_rate", "combine_size"]:
if option in bloom_filter_options:
bloom_filter_params[option] = bloom_filter_options[option]
if "max_elements" not in bloom_filter_params:
bloom_filter_params["max_elements"] = max(
c.shape[0] for c in left.chunks + right.chunks
)
if len(left.chunks) > len(right.chunks):
filter_on = bloom_filter_options.get("filter", DEFAULT_BLOOM_FILTER_ON)
if filter_on == "large":
if len(left.chunks) > len(right.chunks):
left = filter_by_bloom_filter(
left, right, left_on, right_on, **bloom_filter_params
)
else:
right = filter_by_bloom_filter(
right, left, right_on, left_on, **bloom_filter_params
)
elif filter_on == "small":
if len(left.chunks) < len(right.chunks):
left = filter_by_bloom_filter(
left, right, left_on, right_on, **bloom_filter_params
)
else:
right = filter_by_bloom_filter(
right, left, right_on, left_on, **bloom_filter_params
)
else:
assert filter_on == "both"
# both
left = filter_by_bloom_filter(
left,
right,
left_on,
right_on,
**bloom_filter_params,
left, right, left_on, right_on, **bloom_filter_params
)
else:
right = filter_by_bloom_filter(
right, left, right_on, left_on, **bloom_filter_params
)
Expand Down Expand Up @@ -587,15 +615,29 @@ def _if_apply_bloom_filter(
op: "DataFrameMerge",
left: TileableType,
right: TileableType,
bloom_filter_chunk_threshold: int,
):
if len(left.chunks + right.chunks) <= bloom_filter_chunk_threshold:
# bloom filter can only work for inner merge
if op.how != "inner" or op.bloom_filter is False:
return False
elif method == MergeMethod.shuffle and op.bloom_filter:
elif op.bloom_filter is True:
return True
else:

bloom_filter_options = op.bloom_filter_options or dict()
bloom_filter_chunk_threshold = bloom_filter_options.get(
"apply_chunk_size_threshold", DEFAULT_BLOOM_FILTER_CHUNK_THRESHOLD
)

# TODO(hks): disable bloom_filter for now, when it is ready, turn it on them
# bloom_filter == auto
if len(left.chunks + right.chunks) <= bloom_filter_chunk_threshold:
# if size of input chunks <= threshold, skip bloom filter
return False
elif method == MergeMethod.shuffle:
# for shuffle, enable bloom filter by default
return False

return False

@classmethod
def tile(cls, op: "DataFrameMerge"):
left = build_concatenated_rows_frame(op.inputs[0])
Expand All @@ -612,36 +654,42 @@ def tile(cls, op: "DataFrameMerge"):
yield TileStatus([left, right] + left.chunks + right.chunks, progress=0.2)
left = auto_merge_chunks(ctx, left)
right = auto_merge_chunks(ctx, right)
logger.debug(
"Before merge %s, left data count: %d, chunk size: %d, "
"right data count: %d, chunk_size: %d",
op,
left.shape[0],
len(left.chunks),
right.shape[0],
len(right.chunks),
)
else:
logger.debug(
"Skip auto merge before %s, left chunk size: %d, right chunk size: %d",
op,
len(left.chunks),
len(right.chunks),
)

method = cls._choose_merge_method(op, left, right)
bloom_filter_chunk_threshold = 10
if isinstance(op.bloom_filter, dict):
bloom_filter_chunk_threshold = op.bloom_filter.pop(
"apply_chunk_size_threshold", bloom_filter_chunk_threshold
)
if cls._if_apply_bloom_filter(
method, op, left, right, bloom_filter_chunk_threshold
):
if cls._if_apply_bloom_filter(method, op, left, right):
if has_unknown_shape(left, right): # pragma: no cover
yield TileStatus(left.chunks + right.chunks, progress=0.3)
left_on = _prepare_shuffle_on(op.left_index, op.left_on, op.on)
right_on = _prepare_shuffle_on(op.right_index, op.right_on, op.on)
if op.how == "inner" and op.bloom_filter:
if has_unknown_shape(left, right):
yield TileStatus(left.chunks + right.chunks, progress=0.3)
small_one = right if len(left.chunks) > len(right.chunks) else left
logger.debug(
"Apply bloom filter for operand %s, use DataFrame %s to build bloom filter.",
op,
small_one,
)
left, right = yield from recursive_tile(
*cls._apply_bloom_filter(left, right, left_on, right_on, op)
)
# auto merge after bloom filter
yield TileStatus(
[left, right] + left.chunks + right.chunks, progress=0.5
)
left = auto_merge_chunks(ctx, left)
right = auto_merge_chunks(ctx, right)
small_one = right if len(left.chunks) > len(right.chunks) else left
logger.debug(
"Apply bloom filter for operand %s, use DataFrame %s to build bloom filter.",
op,
small_one,
)
left, right = yield from recursive_tile(
*cls._apply_bloom_filter(left, right, left_on, right_on, op)
)
# auto merge after bloom filter
yield TileStatus([left, right] + left.chunks + right.chunks, progress=0.5)
left = auto_merge_chunks(ctx, left)
right = auto_merge_chunks(ctx, right)

if op.method == "auto":
# if method is auto, select new method after auto merge
Expand All @@ -665,8 +713,18 @@ def tile(cls, op: "DataFrameMerge"):
yield TileStatus(
ret[0].chunks, progress=0.8
) # trigger execution for chunks
return [auto_merge_chunks(get_context(), ret[0])]
merged = auto_merge_chunks(get_context(), ret[0])
logger.debug(
"After merge %s, data size: %d, chunk size: %d",
op,
merged.shape[0],
len(merged.chunks),
)
return [merged]
else:
logger.debug(
"Skip auto merge after %s, chunk size: %d", op, len(ret[0].chunks)
)
return ret

@classmethod
Expand Down Expand Up @@ -750,7 +808,8 @@ def merge(
method: str = "auto",
auto_merge: str = "both",
auto_merge_threshold: int = 8,
bloom_filter: Union[bool, Dict] = True,
bloom_filter: Union[bool, str] = "auto",
bloom_filter_options: Dict[str, Any] = None,
) -> DataFrame:
"""
Merge DataFrame or named Series objects with a database-style join.
Expand Down Expand Up @@ -843,17 +902,16 @@ def merge(
When how is "inner", merged result could be much smaller than original DataFrame,
if the number of chunks is greater than the threshold,
it will merge small chunks automatically.
bloom_filter: bool or dict, default True
Use bloom filter to optimize merge, you can pass a dict to specify arguments for
bloom filter.
If is a dict:
bloom_filter: bool, str, default "auto"
Use bloom filter to optimize merge
bloom_filter_options: dict
* "max_elements": max elements in bloom filter,
default value is the max size of all input chunks
* "error_rate": error raite, default 0.1.
* "apply_chunk_size_threshold": min chunk size of input chunks to apply bloom filter, default 10
when chunk size of left and right is greater than this threshold, apply bloom filter
* "filter": "large", "small", "both", default "large"
decides to filter on large, small or both DataFrames.
Returns
-------
Expand Down Expand Up @@ -944,8 +1002,26 @@ def merge(
raise NotImplementedError(f"{method} merge is not supported")
if auto_merge not in ["both", "none", "before", "after"]: # pragma: no cover
raise ValueError(
f"{auto_merge} can only be `both`, `none`, `before` or `after`"
f"auto_merge can only be `both`, `none`, `before` or `after`, got {auto_merge}"
)
if bloom_filter not in [True, False, "auto"]:
raise ValueError(
f'bloom_filter can only be True, False, or "auto", got {bloom_filter}'
)
if bloom_filter_options:
if not isinstance(bloom_filter_options, dict):
raise TypeError(
f"bloom_filter_options must be a dict, got {type(bloom_filter_options)}"
)
for k, v in bloom_filter_options.items():
if k not in BLOOM_FILTER_OPTIONS:
raise ValueError(
f"Invalid bloom filter option {k}, available: {BLOOM_FILTER_OPTIONS}"
)
if k == "filter" and v not in BLOOM_FILTER_ON_OPTIONS:
raise ValueError(
f"Invalid filter {k}, available: {BLOOM_FILTER_ON_OPTIONS}"
)
op = DataFrameMerge(
how=how,
on=on,
Expand All @@ -962,6 +1038,7 @@ def merge(
auto_merge=auto_merge,
auto_merge_threshold=auto_merge_threshold,
bloom_filter=bloom_filter,
bloom_filter_options=bloom_filter_options,
output_types=[OutputType.dataframe],
)
return op(df, right)
Expand All @@ -979,6 +1056,7 @@ def join(
auto_merge: str = "both",
auto_merge_threshold: int = 8,
bloom_filter: Union[bool, Dict] = True,
bloom_filter_options: Dict[str, Any] = None,
) -> DataFrame:
"""
Join columns of another DataFrame.
Expand Down Expand Up @@ -1033,17 +1111,16 @@ def join(
When how is "inner", merged result could be much smaller than original DataFrame,
if the number of chunks is greater than the threshold,
it will merge small chunks automatically.
bloom_filter: bool or dict, default True
Use bloom filter to optimize merge, you can pass a dict to specify arguments for
bloom filter.
If is a dict:
bloom_filter: bool, str, default "auto"
Use bloom filter to optimize merge
bloom_filter_options: dict
* "max_elements": max elements in bloom filter,
default value is the max size of all input chunks
* "error_rate": error raite, default 0.1.
* "apply_chunk_size_threshold": min chunk size of input chunks to apply bloom filter, default 10
when chunk size of left and right is greater than this threshold, apply bloom filter
* "filter": "large", "small", "both", default "large"
decides to filter on large, small or both DataFrames.
Returns
-------
Expand Down Expand Up @@ -1153,4 +1230,5 @@ def join(
auto_merge=auto_merge,
auto_merge_threshold=auto_merge_threshold,
bloom_filter=bloom_filter,
bloom_filter_options=bloom_filter_options,
)
Loading

0 comments on commit 696d95e

Please sign in to comment.