diff --git a/mars/core/operand/core.py b/mars/core/operand/core.py index 1e43813a67..05f6841192 100644 --- a/mars/core/operand/core.py +++ b/mars/core/operand/core.py @@ -335,20 +335,22 @@ def post_execute(cls, ctx: Union[dict, Context], op: OperandType): def estimate_size(cls, ctx: dict, op: OperandType): from .fetch import FetchShuffle - exec_size = 0 + # when sizes of all outputs are deterministic, return directly outputs = op.outputs - pure_dep_keys = set( - inp.key - for inp, is_dep in zip(op.inputs or (), op.pure_depends or ()) - if is_dep - ) if all( not c.is_sparse() and hasattr(c, "nbytes") and not np.isnan(c.nbytes) for c in outputs ): for out in outputs: ctx[out.key] = (out.nbytes, out.nbytes) + return + pure_dep_keys = set( + inp.key + for inp, is_dep in zip(op.inputs or (), op.pure_depends or ()) + if is_dep + ) + exec_sizes = [0] for inp in op.inputs or (): if inp.key in pure_dep_keys: continue @@ -361,13 +363,16 @@ def estimate_size(cls, ctx: dict, op: OperandType): # execution size of a specific data chunk may be # larger than stored type due to objects for key, shape in keys_and_shapes: - exec_size += ctx[key][0] + exec_sizes.append(ctx[key][0]) except KeyError: if not op.sparse: inp_size = calc_data_size(inp) if not np.isnan(inp_size): - exec_size += inp_size - exec_size = int(exec_size) + exec_sizes.append(inp_size) + if any(c.is_sparse() for c in op.inputs): + exec_size = sum(exec_sizes) + else: + exec_size = max(exec_sizes) total_out_size = 0 chunk_sizes = dict() @@ -408,7 +413,7 @@ def estimate_size(cls, ctx: dict, op: OperandType): max_sparse_size = np.nan if not np.isnan(max_sparse_size): result_size = min(result_size, max_sparse_size) - ctx[out.key] = (result_size, exec_size * memory_scale // len(outputs)) + ctx[out.key] = (result_size, int(exec_size * memory_scale // len(outputs))) @classmethod def concat_tileable_chunks(cls, tileable: TileableType): diff --git a/mars/lib/groupby_wrapper.py b/mars/lib/groupby_wrapper.py index 1968d32980..d15e7921e8 100644 --- a/mars/lib/groupby_wrapper.py +++ b/mars/lib/groupby_wrapper.py @@ -20,6 +20,7 @@ import pandas as pd from pandas.core.groupby import DataFrameGroupBy, SeriesGroupBy +from ..utils import estimate_pandas_size from .version import parse as parse_version _HAS_SQUEEZE = parse_version(pd.__version__) < parse_version("1.1.0") @@ -124,6 +125,9 @@ def __sizeof__(self): getattr(self.groupby_obj.grouper, "_cache", None) ) + def estimate_size(self): + return estimate_pandas_size(self.obj) + estimate_pandas_size(self.obj.index) + def __reduce__(self): return ( type(self).from_tuple, diff --git a/mars/lib/tests/test_lib.py b/mars/lib/tests/test_lib.py index 3d538a3e64..7a52759597 100644 --- a/mars/lib/tests/test_lib.py +++ b/mars/lib/tests/test_lib.py @@ -19,7 +19,7 @@ import numpy as np from ...tests.core import assert_groupby_equal -from ...utils import calc_data_size +from ...utils import calc_data_size, estimate_pandas_size from ..groupby_wrapper import wrapped_groupby @@ -42,6 +42,7 @@ def test_groupby_wrapper(): assert grouped.is_frame is True assert sys.getsizeof(grouped) > sys.getsizeof(grouped.groupby_obj) assert calc_data_size(grouped) > sys.getsizeof(grouped.groupby_obj) + assert grouped.estimate_size() > estimate_pandas_size(grouped.groupby_obj) grouped = conv_func(wrapped_groupby(df, level=0).C) assert_groupby_equal(grouped, df.groupby(level=0).C) diff --git a/mars/services/scheduling/worker/execution.py b/mars/services/scheduling/worker/execution.py index 42417aaa93..36c86ef0d6 100644 --- a/mars/services/scheduling/worker/execution.py +++ b/mars/services/scheduling/worker/execution.py @@ -207,19 +207,22 @@ async def _collect_input_sizes( *(storage_api.get_infos.delay(k) for k in fetch_keys) ) + # compute memory quota size. when data located in shared memory, the cost + # should be differences between deserialized memory cost and serialized cost, + # otherwise we should take deserialized memory cost for key, meta, infos in zip(fetch_keys, fetch_metas, data_infos): level = functools.reduce(operator.or_, (info.level for info in infos)) if level & StorageLevel.MEMORY: mem_cost = max(0, meta["memory_size"] - meta["store_size"]) else: mem_cost = meta["memory_size"] - sizes[key] = (mem_cost, mem_cost) + sizes[key] = (meta["store_size"], mem_cost) return sizes @classmethod def _estimate_sizes(cls, subtask: Subtask, input_sizes: Dict): - size_context = {k: (s, 0) for k, (s, _c) in input_sizes.items()} + size_context = dict(input_sizes.items()) graph = subtask.chunk_graph key_to_ops = defaultdict(set) @@ -243,7 +246,7 @@ def _estimate_sizes(cls, subtask: Subtask, input_sizes: Dict): visited_op_keys = set() total_memory_cost = 0 - max_memory_cost = 0 + max_memory_cost = sum(calc_size for _, calc_size in size_context.values()) while key_stack: key = key_stack.pop() op = key_to_ops[key][0] @@ -255,24 +258,31 @@ def _estimate_sizes(cls, subtask: Subtask, input_sizes: Dict): total_memory_cost += calc_cost max_memory_cost = max(total_memory_cost, max_memory_cost) - result_cost = sum(size_context[out.key][0] for out in op.outputs) - total_memory_cost += result_cost - calc_cost + if not isinstance(op, Fetch): + # when calculation result is stored, memory cost of calculation + # can be replaced with result memory cost + result_cost = sum(size_context[out.key][0] for out in op.outputs) + total_memory_cost += result_cost - calc_cost - visited_op_keys.add(op.key) + visited_op_keys.add(key) for succ_op_key in op_key_graph.iter_successors(key): pred_ref_count[succ_op_key] -= 1 if pred_ref_count[succ_op_key] == 0: key_stack.append(succ_op_key) + for pred_op_key in op_key_graph.iter_predecessors(key): succ_ref_count[pred_op_key] -= 1 if succ_ref_count[pred_op_key] == 0: + pred_op = key_to_ops[pred_op_key][0] + # when clearing fetches, subtract memory size, otherwise subtract store size + account_idx = 1 if isinstance(pred_op, Fetch) else 0 pop_result_cost = sum( - size_context.pop(out.key, (0, 0))[0] + size_context.pop(out.key, (0, 0))[account_idx] for out in key_to_ops[pred_op_key][0].outputs ) total_memory_cost -= pop_result_cost - return sum(t[1] for t in size_context.values()), max_memory_cost + return sum(t[0] for t in size_context.values()), max_memory_cost @classmethod def _check_cancelling(cls, subtask_info: SubtaskExecutionInfo): diff --git a/mars/services/scheduling/worker/tests/test_execution.py b/mars/services/scheduling/worker/tests/test_execution.py index cc812f6b06..e50a3714da 100644 --- a/mars/services/scheduling/worker/tests/test_execution.py +++ b/mars/services/scheduling/worker/tests/test_execution.py @@ -21,11 +21,18 @@ from typing import Tuple import numpy as np +import pandas as pd import pytest from ..... import oscar as mo from ..... import remote as mr -from .....core import ChunkGraph, ChunkGraphBuilder, TileableGraph, TileableGraphBuilder +from .....core import ( + ChunkGraph, + ChunkGraphBuilder, + TileableGraph, + TileableGraphBuilder, + OutputType, +) from .....remote.core import RemoteFunction from .....tensor.fetch import TensorFetch from .....tensor.arithmetic import TensorTreeAdd @@ -384,6 +391,47 @@ def delay_fun(delay, _inp1): ) +def test_estimate_size(): + from ..execution import SubtaskExecutionActor + from .....dataframe.arithmetic import DataFrameAdd + from .....dataframe.fetch import DataFrameFetch + from .....dataframe.utils import parse_index + + index_value = parse_index(pd.Int64Index([10, 20, 30])) + + input1 = DataFrameFetch(output_types=[OutputType.series],).new_chunk( + [], _key="INPUT1", shape=(np.nan,), dtype=np.dtype("O"), index_value=index_value + ) + input2 = DataFrameFetch(output_types=[OutputType.series],).new_chunk( + [], _key="INPUT2", shape=(np.nan,), dtype=np.dtype("O"), index_value=index_value + ) + result_chunk = DataFrameAdd( + axis=0, output_types=[OutputType.series], lhs=input1, rhs=input2 + ).new_chunk( + [input1, input2], + _key="ADD_RESULT", + shape=(np.nan,), + dtype=np.dtype("O"), + index_value=index_value, + ) + + chunk_graph = ChunkGraph([result_chunk]) + chunk_graph.add_node(input1) + chunk_graph.add_node(input2) + chunk_graph.add_node(result_chunk) + chunk_graph.add_edge(input1, result_chunk) + chunk_graph.add_edge(input2, result_chunk) + + input_sizes = { + "INPUT1": (1024, 1024), + "INPUT2": (1024, 1024), + } + + subtask = Subtask("test_subtask", session_id="session_id", chunk_graph=chunk_graph) + result = SubtaskExecutionActor._estimate_sizes(subtask, input_sizes) + assert result[0] == 1024 + + @pytest.mark.asyncio @pytest.mark.parametrize("actor_pool", [(1, False)], indirect=True) async def test_cancel_without_kill(actor_pool): diff --git a/mars/tests/test_utils.py b/mars/tests/test_utils.py index 412097cecb..63888d8504 100644 --- a/mars/tests/test_utils.py +++ b/mars/tests/test_utils.py @@ -188,8 +188,8 @@ def test_lazy_import(): old_sys_path = sys.path mock_mod = textwrap.dedent( """ - __version__ = '0.1.0b1' - """.strip() + __version__ = '0.1.0b1' + """.strip() ) temp_dir = tempfile.mkdtemp(prefix="mars-utils-test-") @@ -482,6 +482,40 @@ def test_readable_size(): assert utils.readable_size(14354000000000) == "13.05T" +def test_estimate_pandas_size(): + df1 = pd.DataFrame(np.random.rand(50, 10)) + assert utils.estimate_pandas_size(df1) == sys.getsizeof(df1) + + df2 = pd.DataFrame(np.random.rand(1000, 10)) + assert utils.estimate_pandas_size(df2) == sys.getsizeof(df2) + + df3 = pd.DataFrame( + { + "A": np.random.choice(["abcd", "def", "gh"], size=(1000,)), + "B": np.random.rand(1000), + "C": np.random.rand(1000), + } + ) + assert utils.estimate_pandas_size(df3) != sys.getsizeof(df3) + + s1 = pd.Series(np.random.rand(1000)) + assert utils.estimate_pandas_size(s1) == sys.getsizeof(s1) + + from ..dataframe.arrays import ArrowStringArray + + array = ArrowStringArray(np.random.choice(["abcd", "def", "gh"], size=(1000,))) + s2 = pd.Series(array) + assert utils.estimate_pandas_size(s2) == sys.getsizeof(s2) + + s3 = pd.Series(np.random.choice(["abcd", "def", "gh"], size=(1000,))) + assert utils.estimate_pandas_size(s3) != sys.getsizeof(s3) + + idx1 = pd.MultiIndex.from_arrays( + [np.arange(0, 1000), np.random.choice(["abcd", "def", "gh"], size=(1000,))] + ) + assert utils.estimate_pandas_size(idx1) != sys.getsizeof(idx1) + + @require_ray def test_web_serialize_lambda(): register_ray_serializers() diff --git a/mars/utils.py b/mars/utils.py index 450e792930..2aece25601 100644 --- a/mars/utils.py +++ b/mars/utils.py @@ -380,8 +380,10 @@ def calc_data_size(dt: Any, shape: Tuple[int] = None) -> int: return sum(calc_data_size(c) for c in dt) shape = getattr(dt, "shape", None) or shape - if hasattr(dt, "memory_usage") or hasattr(dt, "groupby_obj"): - return sys.getsizeof(dt) + if isinstance(dt, (pd.DataFrame, pd.Series)): + return estimate_pandas_size(dt) + if hasattr(dt, "estimate_size"): + return dt.estimate_size() if hasattr(dt, "nbytes"): return max(sys.getsizeof(dt), dt.nbytes) if hasattr(dt, "shape") and len(dt.shape) == 0: @@ -404,6 +406,45 @@ def calc_data_size(dt: Any, shape: Tuple[int] = None) -> int: return sys.getsizeof(dt) +def estimate_pandas_size( + df_obj, max_samples: int = 10, min_sample_rows: int = 100 +) -> int: + if len(df_obj) <= min_sample_rows or isinstance(df_obj, pd.RangeIndex): + return sys.getsizeof(df_obj) + + from .dataframe.arrays import ArrowDtype + + def _is_fast_dtype(dtype): + if isinstance(dtype, np.dtype): + return np.issubdtype(dtype, np.number) + else: + return isinstance(dtype, ArrowDtype) + + dtypes = [] + if isinstance(df_obj, pd.DataFrame): + dtypes.extend(df_obj.dtypes) + index_obj = df_obj.index + elif isinstance(df_obj, pd.Series): + dtypes.append(df_obj.dtype) + index_obj = df_obj.index + else: + index_obj = df_obj + + # handling possible MultiIndex + if hasattr(index_obj, "dtypes"): + dtypes.extend(index_obj.dtypes) + else: + dtypes.append(index_obj.dtype) + + if all(_is_fast_dtype(dtype) for dtype in dtypes): + return sys.getsizeof(df_obj) + + indices = np.sort(np.random.choice(len(df_obj), size=max_samples, replace=False)) + iloc = df_obj if isinstance(df_obj, pd.Index) else df_obj.iloc + sample_size = sys.getsizeof(iloc[indices]) + return sample_size * len(df_obj) // max_samples + + def build_fetch_chunk( chunk: ChunkType, input_chunk_keys: List[str] = None, **kwargs ) -> ChunkType: