diff --git a/mars/lib/groupby_wrapper.py b/mars/lib/groupby_wrapper.py index 1968d32980..84d1b6a493 100644 --- a/mars/lib/groupby_wrapper.py +++ b/mars/lib/groupby_wrapper.py @@ -20,6 +20,7 @@ import pandas as pd from pandas.core.groupby import DataFrameGroupBy, SeriesGroupBy +from ..utils import estimate_dataframe_size from .version import parse as parse_version _HAS_SQUEEZE = parse_version(pd.__version__) < parse_version("1.1.0") @@ -124,6 +125,11 @@ def __sizeof__(self): getattr(self.groupby_obj.grouper, "_cache", None) ) + def estimate_size(self): + return estimate_dataframe_size(self.obj) + estimate_dataframe_size( + self.obj.index + ) + def __reduce__(self): return ( type(self).from_tuple, diff --git a/mars/lib/tests/test_lib.py b/mars/lib/tests/test_lib.py index 3d538a3e64..e3c242081e 100644 --- a/mars/lib/tests/test_lib.py +++ b/mars/lib/tests/test_lib.py @@ -19,7 +19,7 @@ import numpy as np from ...tests.core import assert_groupby_equal -from ...utils import calc_data_size +from ...utils import calc_data_size, estimate_dataframe_size from ..groupby_wrapper import wrapped_groupby @@ -42,6 +42,7 @@ def test_groupby_wrapper(): assert grouped.is_frame is True assert sys.getsizeof(grouped) > sys.getsizeof(grouped.groupby_obj) assert calc_data_size(grouped) > sys.getsizeof(grouped.groupby_obj) + assert grouped.estimate_size() > estimate_dataframe_size(grouped.groupby_obj) grouped = conv_func(wrapped_groupby(df, level=0).C) assert_groupby_equal(grouped, df.groupby(level=0).C) diff --git a/mars/utils.py b/mars/utils.py index 450e792930..34d9adb558 100644 --- a/mars/utils.py +++ b/mars/utils.py @@ -380,8 +380,10 @@ def calc_data_size(dt: Any, shape: Tuple[int] = None) -> int: return sum(calc_data_size(c) for c in dt) shape = getattr(dt, "shape", None) or shape - if hasattr(dt, "memory_usage") or hasattr(dt, "groupby_obj"): - return sys.getsizeof(dt) + if isinstance(dt, (pd.DataFrame, pd.Series)): + return estimate_dataframe_size(dt) + if hasattr(dt, "estimate_size"): + return dt.estimate_size() if hasattr(dt, "nbytes"): return max(sys.getsizeof(dt), dt.nbytes) if hasattr(dt, "shape") and len(dt.shape) == 0: @@ -404,6 +406,20 @@ def calc_data_size(dt: Any, shape: Tuple[int] = None) -> int: return sys.getsizeof(dt) +def estimate_dataframe_size( + df_obj, max_samples: int = 10, min_sample_rows: int = 100 +) -> int: + if len(df_obj) <= min_sample_rows or isinstance(df_obj, pd.RangeIndex): + return sys.getsizeof(df_obj) + else: + indices = np.sort( + np.random.choice(len(df_obj), size=max_samples, replace=False) + ) + iloc = df_obj if isinstance(df_obj, pd.Index) else df_obj.iloc + sample_size = sys.getsizeof(iloc[indices]) + return sample_size * len(df_obj) // max_samples + + def build_fetch_chunk( chunk: ChunkType, input_chunk_keys: List[str] = None, **kwargs ) -> ChunkType: