From 696d95ec623b034389d4c4cc598951786dfee598 Mon Sep 17 00:00:00 2001 From: "Xuye (Chris) Qin" Date: Mon, 9 May 2022 16:36:50 +0800 Subject: [PATCH] Disable bloom filter in merge for now (#2967) (cherry picked from commit 4d64416b7a80eed66ad9a68f24ac9ce30981cf43) --- mars/core/entity/executable.py | 2 + mars/dataframe/base/bloom_filter.py | 11 +- mars/dataframe/merge/merge.py | 194 ++++++++++++------ mars/dataframe/merge/tests/test_merge.py | 27 ++- .../merge/tests/test_merge_execution.py | 48 ++++- 5 files changed, 218 insertions(+), 64 deletions(-) diff --git a/mars/core/entity/executable.py b/mars/core/entity/executable.py index 1140ce3d98..e880681dce 100644 --- a/mars/core/entity/executable.py +++ b/mars/core/entity/executable.py @@ -58,6 +58,8 @@ def _thread_body(self): fut.set_result(None) except Exception as ex: # pragma: no cover # noqa: E722 # nosec # pylint: disable=bare-except fut.set_exception(ex) + finally: + del session def stop(self): if self._decref_thread: # pragma: no branch diff --git a/mars/dataframe/base/bloom_filter.py b/mars/dataframe/base/bloom_filter.py index 2a2a9b635d..65b9a14591 100644 --- a/mars/dataframe/base/bloom_filter.py +++ b/mars/dataframe/base/bloom_filter.py @@ -42,7 +42,8 @@ class DataFrameBloomFilter(DataFrameOperand, DataFrameOperandMixin): # for build max_elements = Int64Field("max_elements") error_rate = Float64Field("error_rate") - + combine_size = Int64Field("combine_size") + # chunk execution_stage = StringField("execution_stage", default=None) def __init__(self, execution_stage=None, **kwargs): @@ -71,7 +72,7 @@ def tile(cls, op: "DataFrameBloomFilter"): chunks.append(build_op.new_chunk(inputs=[c])) # union all chunk filters - combine_size = options.combine_size + combine_size = op.combine_size while len(chunks) > combine_size: new_chunks = [] for i in range(0, len(chunks), combine_size): @@ -243,6 +244,7 @@ def filter_by_bloom_filter( right_on: Union[str, List], max_elements: int = 10000, error_rate: float = 0.1, + combine_size: int = None, ): """ Use bloom filter to filter DataFrame. @@ -261,16 +263,21 @@ def filter_by_bloom_filter( How many elements you expect the filter to hold. error_rate: float error_rate defines accuracy. + combine_size: int + Combine size. Returns ------- DataFrame Filtered df1. """ + if combine_size is None: + combine_size = options.combine_size op = DataFrameBloomFilter( left_on=left_on, right_on=right_on, max_elements=max_elements, error_rate=error_rate, + combine_size=combine_size, ) return op(df1, df2) diff --git a/mars/dataframe/merge/merge.py b/mars/dataframe/merge/merge.py index 8d7dc52a92..dfe24698af 100644 --- a/mars/dataframe/merge/merge.py +++ b/mars/dataframe/merge/merge.py @@ -15,7 +15,7 @@ import itertools from collections import namedtuple from enum import Enum -from typing import Dict, List, Optional, Union, Tuple +from typing import Any, Dict, List, Optional, Union, Tuple import numpy as np import pandas as pd @@ -27,6 +27,7 @@ from ...serialization.serializables import ( AnyField, BoolField, + DictField, StringField, TupleField, KeyField, @@ -50,6 +51,17 @@ import logging logger = logging.getLogger(__name__) +DEFAULT_BLOOM_FILTER_CHUNK_THRESHOLD = 10 +# use bloom filter to filter large DataFrame +BLOOM_FILTER_OPTIONS = [ + "max_elements", + "error_rate", + "apply_chunk_size_threshold", + "filter", + "combine_size", +] +BLOOM_FILTER_ON_OPTIONS = ["large", "small", "both"] +DEFAULT_BLOOM_FILTER_ON = "large" class DataFrameMergeAlign(MapReduceOperand, DataFrameOperandMixin): @@ -157,6 +169,7 @@ class DataFrameMerge(DataFrameOperand, DataFrameOperandMixin): auto_merge = StringField("auto_merge") auto_merge_threshold = Int32Field("auto_merge_threshold") bloom_filter = AnyField("bloom_filter") + bloom_filter_options = DictField("bloom_filter_options") # only for broadcast merge split_info = NamedTupleField("split_info") @@ -265,24 +278,39 @@ def _apply_bloom_filter( op: "DataFrameMerge", ): bloom_filter_params = dict() - if isinstance(op.bloom_filter, dict): - if "max_elements" in op.bloom_filter: - bloom_filter_params["max_elements"] = op.bloom_filter["max_elements"] - if "error_rate" in op.bloom_filter: - bloom_filter_params["error_rate"] = op.bloom_filter["error_rate"] + bloom_filter_options = op.bloom_filter_options or dict() + for option in ["max_elements", "error_rate", "combine_size"]: + if option in bloom_filter_options: + bloom_filter_params[option] = bloom_filter_options[option] if "max_elements" not in bloom_filter_params: bloom_filter_params["max_elements"] = max( c.shape[0] for c in left.chunks + right.chunks ) - if len(left.chunks) > len(right.chunks): + filter_on = bloom_filter_options.get("filter", DEFAULT_BLOOM_FILTER_ON) + if filter_on == "large": + if len(left.chunks) > len(right.chunks): + left = filter_by_bloom_filter( + left, right, left_on, right_on, **bloom_filter_params + ) + else: + right = filter_by_bloom_filter( + right, left, right_on, left_on, **bloom_filter_params + ) + elif filter_on == "small": + if len(left.chunks) < len(right.chunks): + left = filter_by_bloom_filter( + left, right, left_on, right_on, **bloom_filter_params + ) + else: + right = filter_by_bloom_filter( + right, left, right_on, left_on, **bloom_filter_params + ) + else: + assert filter_on == "both" + # both left = filter_by_bloom_filter( - left, - right, - left_on, - right_on, - **bloom_filter_params, + left, right, left_on, right_on, **bloom_filter_params ) - else: right = filter_by_bloom_filter( right, left, right_on, left_on, **bloom_filter_params ) @@ -587,15 +615,29 @@ def _if_apply_bloom_filter( op: "DataFrameMerge", left: TileableType, right: TileableType, - bloom_filter_chunk_threshold: int, ): - if len(left.chunks + right.chunks) <= bloom_filter_chunk_threshold: + # bloom filter can only work for inner merge + if op.how != "inner" or op.bloom_filter is False: return False - elif method == MergeMethod.shuffle and op.bloom_filter: + elif op.bloom_filter is True: return True - else: + + bloom_filter_options = op.bloom_filter_options or dict() + bloom_filter_chunk_threshold = bloom_filter_options.get( + "apply_chunk_size_threshold", DEFAULT_BLOOM_FILTER_CHUNK_THRESHOLD + ) + + # TODO(hks): disable bloom_filter for now, when it is ready, turn it on them + # bloom_filter == auto + if len(left.chunks + right.chunks) <= bloom_filter_chunk_threshold: + # if size of input chunks <= threshold, skip bloom filter + return False + elif method == MergeMethod.shuffle: + # for shuffle, enable bloom filter by default return False + return False + @classmethod def tile(cls, op: "DataFrameMerge"): left = build_concatenated_rows_frame(op.inputs[0]) @@ -612,36 +654,42 @@ def tile(cls, op: "DataFrameMerge"): yield TileStatus([left, right] + left.chunks + right.chunks, progress=0.2) left = auto_merge_chunks(ctx, left) right = auto_merge_chunks(ctx, right) + logger.debug( + "Before merge %s, left data count: %d, chunk size: %d, " + "right data count: %d, chunk_size: %d", + op, + left.shape[0], + len(left.chunks), + right.shape[0], + len(right.chunks), + ) + else: + logger.debug( + "Skip auto merge before %s, left chunk size: %d, right chunk size: %d", + op, + len(left.chunks), + len(right.chunks), + ) method = cls._choose_merge_method(op, left, right) - bloom_filter_chunk_threshold = 10 - if isinstance(op.bloom_filter, dict): - bloom_filter_chunk_threshold = op.bloom_filter.pop( - "apply_chunk_size_threshold", bloom_filter_chunk_threshold - ) - if cls._if_apply_bloom_filter( - method, op, left, right, bloom_filter_chunk_threshold - ): + if cls._if_apply_bloom_filter(method, op, left, right): + if has_unknown_shape(left, right): # pragma: no cover + yield TileStatus(left.chunks + right.chunks, progress=0.3) left_on = _prepare_shuffle_on(op.left_index, op.left_on, op.on) right_on = _prepare_shuffle_on(op.right_index, op.right_on, op.on) - if op.how == "inner" and op.bloom_filter: - if has_unknown_shape(left, right): - yield TileStatus(left.chunks + right.chunks, progress=0.3) - small_one = right if len(left.chunks) > len(right.chunks) else left - logger.debug( - "Apply bloom filter for operand %s, use DataFrame %s to build bloom filter.", - op, - small_one, - ) - left, right = yield from recursive_tile( - *cls._apply_bloom_filter(left, right, left_on, right_on, op) - ) - # auto merge after bloom filter - yield TileStatus( - [left, right] + left.chunks + right.chunks, progress=0.5 - ) - left = auto_merge_chunks(ctx, left) - right = auto_merge_chunks(ctx, right) + small_one = right if len(left.chunks) > len(right.chunks) else left + logger.debug( + "Apply bloom filter for operand %s, use DataFrame %s to build bloom filter.", + op, + small_one, + ) + left, right = yield from recursive_tile( + *cls._apply_bloom_filter(left, right, left_on, right_on, op) + ) + # auto merge after bloom filter + yield TileStatus([left, right] + left.chunks + right.chunks, progress=0.5) + left = auto_merge_chunks(ctx, left) + right = auto_merge_chunks(ctx, right) if op.method == "auto": # if method is auto, select new method after auto merge @@ -665,8 +713,18 @@ def tile(cls, op: "DataFrameMerge"): yield TileStatus( ret[0].chunks, progress=0.8 ) # trigger execution for chunks - return [auto_merge_chunks(get_context(), ret[0])] + merged = auto_merge_chunks(get_context(), ret[0]) + logger.debug( + "After merge %s, data size: %d, chunk size: %d", + op, + merged.shape[0], + len(merged.chunks), + ) + return [merged] else: + logger.debug( + "Skip auto merge after %s, chunk size: %d", op, len(ret[0].chunks) + ) return ret @classmethod @@ -750,7 +808,8 @@ def merge( method: str = "auto", auto_merge: str = "both", auto_merge_threshold: int = 8, - bloom_filter: Union[bool, Dict] = True, + bloom_filter: Union[bool, str] = "auto", + bloom_filter_options: Dict[str, Any] = None, ) -> DataFrame: """ Merge DataFrame or named Series objects with a database-style join. @@ -843,17 +902,16 @@ def merge( When how is "inner", merged result could be much smaller than original DataFrame, if the number of chunks is greater than the threshold, it will merge small chunks automatically. - bloom_filter: bool or dict, default True - Use bloom filter to optimize merge, you can pass a dict to specify arguments for - bloom filter. - - If is a dict: - + bloom_filter: bool, str, default "auto" + Use bloom filter to optimize merge + bloom_filter_options: dict * "max_elements": max elements in bloom filter, default value is the max size of all input chunks * "error_rate": error raite, default 0.1. * "apply_chunk_size_threshold": min chunk size of input chunks to apply bloom filter, default 10 when chunk size of left and right is greater than this threshold, apply bloom filter + * "filter": "large", "small", "both", default "large" + decides to filter on large, small or both DataFrames. Returns ------- @@ -944,8 +1002,26 @@ def merge( raise NotImplementedError(f"{method} merge is not supported") if auto_merge not in ["both", "none", "before", "after"]: # pragma: no cover raise ValueError( - f"{auto_merge} can only be `both`, `none`, `before` or `after`" + f"auto_merge can only be `both`, `none`, `before` or `after`, got {auto_merge}" + ) + if bloom_filter not in [True, False, "auto"]: + raise ValueError( + f'bloom_filter can only be True, False, or "auto", got {bloom_filter}' ) + if bloom_filter_options: + if not isinstance(bloom_filter_options, dict): + raise TypeError( + f"bloom_filter_options must be a dict, got {type(bloom_filter_options)}" + ) + for k, v in bloom_filter_options.items(): + if k not in BLOOM_FILTER_OPTIONS: + raise ValueError( + f"Invalid bloom filter option {k}, available: {BLOOM_FILTER_OPTIONS}" + ) + if k == "filter" and v not in BLOOM_FILTER_ON_OPTIONS: + raise ValueError( + f"Invalid filter {k}, available: {BLOOM_FILTER_ON_OPTIONS}" + ) op = DataFrameMerge( how=how, on=on, @@ -962,6 +1038,7 @@ def merge( auto_merge=auto_merge, auto_merge_threshold=auto_merge_threshold, bloom_filter=bloom_filter, + bloom_filter_options=bloom_filter_options, output_types=[OutputType.dataframe], ) return op(df, right) @@ -979,6 +1056,7 @@ def join( auto_merge: str = "both", auto_merge_threshold: int = 8, bloom_filter: Union[bool, Dict] = True, + bloom_filter_options: Dict[str, Any] = None, ) -> DataFrame: """ Join columns of another DataFrame. @@ -1033,17 +1111,16 @@ def join( When how is "inner", merged result could be much smaller than original DataFrame, if the number of chunks is greater than the threshold, it will merge small chunks automatically. - bloom_filter: bool or dict, default True - Use bloom filter to optimize merge, you can pass a dict to specify arguments for - bloom filter. - - If is a dict: - + bloom_filter: bool, str, default "auto" + Use bloom filter to optimize merge + bloom_filter_options: dict * "max_elements": max elements in bloom filter, default value is the max size of all input chunks * "error_rate": error raite, default 0.1. * "apply_chunk_size_threshold": min chunk size of input chunks to apply bloom filter, default 10 when chunk size of left and right is greater than this threshold, apply bloom filter + * "filter": "large", "small", "both", default "large" + decides to filter on large, small or both DataFrames. Returns ------- @@ -1153,4 +1230,5 @@ def join( auto_merge=auto_merge, auto_merge_threshold=auto_merge_threshold, bloom_filter=bloom_filter, + bloom_filter_options=bloom_filter_options, ) diff --git a/mars/dataframe/merge/tests/test_merge.py b/mars/dataframe/merge/tests/test_merge.py index c180a9d212..4e1f6c354e 100644 --- a/mars/dataframe/merge/tests/test_merge.py +++ b/mars/dataframe/merge/tests/test_merge.py @@ -14,6 +14,7 @@ import numpy as np import pandas as pd +import pytest from ....core import tile from ....core.operand import OperandStage @@ -75,6 +76,28 @@ def test_merge(): ) +def test_merge_invalid_parameters(): + pdf1 = pd.DataFrame( + np.arange(20).reshape((4, 5)) + 1, columns=["a", "b", "c", "d", "e"] + ) + pdf2 = pd.DataFrame(np.arange(20).reshape((5, 4)) + 1, columns=["a", "b", "x", "y"]) + + df1 = from_pandas(pdf1, chunk_size=2) + df2 = from_pandas(pdf2, chunk_size=3) + + with pytest.raises(ValueError): + df1.merge(df2, bloom_filter="wrong") + + with pytest.raises(TypeError): + df1.merge(df2, bloom_filter_options="wrong") + + with pytest.raises(ValueError): + df1.merge(df2, bloom_filter_options={"wrong": 1}) + + with pytest.raises(ValueError): + df1.merge(df2, bloom_filter_options={"filter": "wrong"}) + + def test_join(): df1 = pd.DataFrame([[1, 3, 3], [4, 2, 6], [7, 8, 9]], index=["a1", "a2", "a3"]) df2 = pd.DataFrame([[1, 2, 3], [1, 5, 6], [7, 8, 9]], index=["a1", "b2", "b3"]) + 1 @@ -92,7 +115,7 @@ def test_join(): ] for kw in parameters: - df = mdf1.join(mdf2, **kw) + df = mdf1.join(mdf2, auto_merge="none", bloom_filter=False, **kw) df = tile(df) assert df.chunk_shape == (3, 1) @@ -140,7 +163,7 @@ def test_join_on(): ] for kw in parameters: - df = mdf1.join(mdf2, **kw) + df = mdf1.join(mdf2, auto_merge="none", bloom_filter=False, **kw) df = tile(df) assert df.chunk_shape == (3, 1) diff --git a/mars/dataframe/merge/tests/test_merge_execution.py b/mars/dataframe/merge/tests/test_merge_execution.py index 50a0f0d217..4bb8292ea1 100644 --- a/mars/dataframe/merge/tests/test_merge_execution.py +++ b/mars/dataframe/merge/tests/test_merge_execution.py @@ -510,7 +510,8 @@ def test_merge_with_bloom_filter(setup): df1.merge( df2, on="col2", - bloom_filter={"max_elements": 100, "error_rate": 0.01}, + bloom_filter=True, + bloom_filter_options={"max_elements": 100, "error_rate": 0.01}, auto_merge="none", ) .execute() @@ -598,6 +599,43 @@ def test_merge_with_bloom_filter(setup): ) +@pytest.mark.parametrize("filter", ["small", "large", "both"]) +def test_merge_with_bloom_filter_options(setup, filter): + ns = np.random.RandomState(0) + raw_df1 = pd.DataFrame( + { + "col1": ns.random(100), + "col2": ns.randint(0, 10, size=(100,)), + "col3": ns.randint(0, 10, size=(100,)), + } + ) + raw_df2 = pd.DataFrame( + { + "col1": ns.random(100), + "col2": ns.randint(0, 10, size=(100,)), + "col3": ns.randint(0, 10, size=(100,)), + } + ) + + df1 = from_pandas(raw_df1, chunk_size=25) + df2 = from_pandas(raw_df2, chunk_size=30) + m = df1.merge( + df2, + on="col2", + auto_merge="none", + method="shuffle", + bloom_filter=True, + bloom_filter_options={"filter": filter, "apply_chunk_size_threshold": 0}, + ) + + expected = raw_df1.merge(raw_df2, on="col2") + result = m.execute().fetch() + pd.testing.assert_frame_equal( + expected.sort_index().sort_values(by=["col1_x"]).reset_index(drop=True), + result.sort_index().sort_values(by=["col1_x"]).reset_index(drop=True), + ) + + @pytest.mark.parametrize("auto_merge", ["none", "both", "before", "after"]) def test_merge_on_duplicate_columns(setup, auto_merge): raw1 = pd.DataFrame( @@ -613,7 +651,13 @@ def test_merge_on_duplicate_columns(setup, auto_merge): df1 = from_pandas(raw1, chunk_size=2) df2 = from_pandas(raw2, chunk_size=3) - r = df1.merge(df2, left_on="lkey", right_on="rkey", auto_merge=auto_merge) + r = df1.merge( + df2, + left_on="lkey", + right_on="rkey", + auto_merge=auto_merge, + auto_merge_threshold=0, + ) result = r.execute().fetch() expected = raw1.merge(raw2, left_on="lkey", right_on="rkey") pd.testing.assert_frame_equal(expected, result)