Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKPORT] Reduce estimation time cost (#2577) #2607

Merged
merged 1 commit into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions mars/core/operand/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,20 +335,22 @@ def post_execute(cls, ctx: Union[dict, Context], op: OperandType):
def estimate_size(cls, ctx: dict, op: OperandType):
from .fetch import FetchShuffle

exec_size = 0
# when sizes of all outputs are deterministic, return directly
outputs = op.outputs
pure_dep_keys = set(
inp.key
for inp, is_dep in zip(op.inputs or (), op.pure_depends or ())
if is_dep
)
if all(
not c.is_sparse() and hasattr(c, "nbytes") and not np.isnan(c.nbytes)
for c in outputs
):
for out in outputs:
ctx[out.key] = (out.nbytes, out.nbytes)
return

pure_dep_keys = set(
inp.key
for inp, is_dep in zip(op.inputs or (), op.pure_depends or ())
if is_dep
)
exec_sizes = [0]
for inp in op.inputs or ():
if inp.key in pure_dep_keys:
continue
Expand All @@ -361,13 +363,16 @@ def estimate_size(cls, ctx: dict, op: OperandType):
# execution size of a specific data chunk may be
# larger than stored type due to objects
for key, shape in keys_and_shapes:
exec_size += ctx[key][0]
exec_sizes.append(ctx[key][0])
except KeyError:
if not op.sparse:
inp_size = calc_data_size(inp)
if not np.isnan(inp_size):
exec_size += inp_size
exec_size = int(exec_size)
exec_sizes.append(inp_size)
if any(c.is_sparse() for c in op.inputs):
exec_size = sum(exec_sizes)
else:
exec_size = max(exec_sizes)

total_out_size = 0
chunk_sizes = dict()
Expand Down Expand Up @@ -408,7 +413,7 @@ def estimate_size(cls, ctx: dict, op: OperandType):
max_sparse_size = np.nan
if not np.isnan(max_sparse_size):
result_size = min(result_size, max_sparse_size)
ctx[out.key] = (result_size, exec_size * memory_scale // len(outputs))
ctx[out.key] = (result_size, int(exec_size * memory_scale // len(outputs)))

@classmethod
def concat_tileable_chunks(cls, tileable: TileableType):
Expand Down
4 changes: 4 additions & 0 deletions mars/lib/groupby_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import pandas as pd
from pandas.core.groupby import DataFrameGroupBy, SeriesGroupBy

from ..utils import estimate_pandas_size
from .version import parse as parse_version

_HAS_SQUEEZE = parse_version(pd.__version__) < parse_version("1.1.0")
Expand Down Expand Up @@ -124,6 +125,9 @@ def __sizeof__(self):
getattr(self.groupby_obj.grouper, "_cache", None)
)

def estimate_size(self):
return estimate_pandas_size(self.obj) + estimate_pandas_size(self.obj.index)

def __reduce__(self):
return (
type(self).from_tuple,
Expand Down
3 changes: 2 additions & 1 deletion mars/lib/tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import numpy as np

from ...tests.core import assert_groupby_equal
from ...utils import calc_data_size
from ...utils import calc_data_size, estimate_pandas_size
from ..groupby_wrapper import wrapped_groupby


Expand All @@ -42,6 +42,7 @@ def test_groupby_wrapper():
assert grouped.is_frame is True
assert sys.getsizeof(grouped) > sys.getsizeof(grouped.groupby_obj)
assert calc_data_size(grouped) > sys.getsizeof(grouped.groupby_obj)
assert grouped.estimate_size() > estimate_pandas_size(grouped.groupby_obj)

grouped = conv_func(wrapped_groupby(df, level=0).C)
assert_groupby_equal(grouped, df.groupby(level=0).C)
Expand Down
26 changes: 18 additions & 8 deletions mars/services/scheduling/worker/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,19 +207,22 @@ async def _collect_input_sizes(
*(storage_api.get_infos.delay(k) for k in fetch_keys)
)

# compute memory quota size. when data located in shared memory, the cost
# should be differences between deserialized memory cost and serialized cost,
# otherwise we should take deserialized memory cost
for key, meta, infos in zip(fetch_keys, fetch_metas, data_infos):
level = functools.reduce(operator.or_, (info.level for info in infos))
if level & StorageLevel.MEMORY:
mem_cost = max(0, meta["memory_size"] - meta["store_size"])
else:
mem_cost = meta["memory_size"]
sizes[key] = (mem_cost, mem_cost)
sizes[key] = (meta["store_size"], mem_cost)

return sizes

@classmethod
def _estimate_sizes(cls, subtask: Subtask, input_sizes: Dict):
size_context = {k: (s, 0) for k, (s, _c) in input_sizes.items()}
size_context = dict(input_sizes.items())
graph = subtask.chunk_graph

key_to_ops = defaultdict(set)
Expand All @@ -243,7 +246,7 @@ def _estimate_sizes(cls, subtask: Subtask, input_sizes: Dict):

visited_op_keys = set()
total_memory_cost = 0
max_memory_cost = 0
max_memory_cost = sum(calc_size for _, calc_size in size_context.values())
while key_stack:
key = key_stack.pop()
op = key_to_ops[key][0]
Expand All @@ -255,24 +258,31 @@ def _estimate_sizes(cls, subtask: Subtask, input_sizes: Dict):
total_memory_cost += calc_cost
max_memory_cost = max(total_memory_cost, max_memory_cost)

result_cost = sum(size_context[out.key][0] for out in op.outputs)
total_memory_cost += result_cost - calc_cost
if not isinstance(op, Fetch):
# when calculation result is stored, memory cost of calculation
# can be replaced with result memory cost
result_cost = sum(size_context[out.key][0] for out in op.outputs)
total_memory_cost += result_cost - calc_cost

visited_op_keys.add(op.key)
visited_op_keys.add(key)

for succ_op_key in op_key_graph.iter_successors(key):
pred_ref_count[succ_op_key] -= 1
if pred_ref_count[succ_op_key] == 0:
key_stack.append(succ_op_key)

for pred_op_key in op_key_graph.iter_predecessors(key):
succ_ref_count[pred_op_key] -= 1
if succ_ref_count[pred_op_key] == 0:
pred_op = key_to_ops[pred_op_key][0]
# when clearing fetches, subtract memory size, otherwise subtract store size
account_idx = 1 if isinstance(pred_op, Fetch) else 0
pop_result_cost = sum(
size_context.pop(out.key, (0, 0))[0]
size_context.pop(out.key, (0, 0))[account_idx]
for out in key_to_ops[pred_op_key][0].outputs
)
total_memory_cost -= pop_result_cost
return sum(t[1] for t in size_context.values()), max_memory_cost
return sum(t[0] for t in size_context.values()), max_memory_cost

@classmethod
def _check_cancelling(cls, subtask_info: SubtaskExecutionInfo):
Expand Down
50 changes: 49 additions & 1 deletion mars/services/scheduling/worker/tests/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@
from typing import Tuple

import numpy as np
import pandas as pd
import pytest

from ..... import oscar as mo
from ..... import remote as mr
from .....core import ChunkGraph, ChunkGraphBuilder, TileableGraph, TileableGraphBuilder
from .....core import (
ChunkGraph,
ChunkGraphBuilder,
TileableGraph,
TileableGraphBuilder,
OutputType,
)
from .....remote.core import RemoteFunction
from .....tensor.fetch import TensorFetch
from .....tensor.arithmetic import TensorTreeAdd
Expand Down Expand Up @@ -384,6 +391,47 @@ def delay_fun(delay, _inp1):
)


def test_estimate_size():
from ..execution import SubtaskExecutionActor
from .....dataframe.arithmetic import DataFrameAdd
from .....dataframe.fetch import DataFrameFetch
from .....dataframe.utils import parse_index

index_value = parse_index(pd.Int64Index([10, 20, 30]))

input1 = DataFrameFetch(output_types=[OutputType.series],).new_chunk(
[], _key="INPUT1", shape=(np.nan,), dtype=np.dtype("O"), index_value=index_value
)
input2 = DataFrameFetch(output_types=[OutputType.series],).new_chunk(
[], _key="INPUT2", shape=(np.nan,), dtype=np.dtype("O"), index_value=index_value
)
result_chunk = DataFrameAdd(
axis=0, output_types=[OutputType.series], lhs=input1, rhs=input2
).new_chunk(
[input1, input2],
_key="ADD_RESULT",
shape=(np.nan,),
dtype=np.dtype("O"),
index_value=index_value,
)

chunk_graph = ChunkGraph([result_chunk])
chunk_graph.add_node(input1)
chunk_graph.add_node(input2)
chunk_graph.add_node(result_chunk)
chunk_graph.add_edge(input1, result_chunk)
chunk_graph.add_edge(input2, result_chunk)

input_sizes = {
"INPUT1": (1024, 1024),
"INPUT2": (1024, 1024),
}

subtask = Subtask("test_subtask", session_id="session_id", chunk_graph=chunk_graph)
result = SubtaskExecutionActor._estimate_sizes(subtask, input_sizes)
assert result[0] == 1024


@pytest.mark.asyncio
@pytest.mark.parametrize("actor_pool", [(1, False)], indirect=True)
async def test_cancel_without_kill(actor_pool):
Expand Down
38 changes: 36 additions & 2 deletions mars/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ def test_lazy_import():
old_sys_path = sys.path
mock_mod = textwrap.dedent(
"""
__version__ = '0.1.0b1'
""".strip()
__version__ = '0.1.0b1'
""".strip()
)

temp_dir = tempfile.mkdtemp(prefix="mars-utils-test-")
Expand Down Expand Up @@ -482,6 +482,40 @@ def test_readable_size():
assert utils.readable_size(14354000000000) == "13.05T"


def test_estimate_pandas_size():
df1 = pd.DataFrame(np.random.rand(50, 10))
assert utils.estimate_pandas_size(df1) == sys.getsizeof(df1)

df2 = pd.DataFrame(np.random.rand(1000, 10))
assert utils.estimate_pandas_size(df2) == sys.getsizeof(df2)

df3 = pd.DataFrame(
{
"A": np.random.choice(["abcd", "def", "gh"], size=(1000,)),
"B": np.random.rand(1000),
"C": np.random.rand(1000),
}
)
assert utils.estimate_pandas_size(df3) != sys.getsizeof(df3)

s1 = pd.Series(np.random.rand(1000))
assert utils.estimate_pandas_size(s1) == sys.getsizeof(s1)

from ..dataframe.arrays import ArrowStringArray

array = ArrowStringArray(np.random.choice(["abcd", "def", "gh"], size=(1000,)))
s2 = pd.Series(array)
assert utils.estimate_pandas_size(s2) == sys.getsizeof(s2)

s3 = pd.Series(np.random.choice(["abcd", "def", "gh"], size=(1000,)))
assert utils.estimate_pandas_size(s3) != sys.getsizeof(s3)

idx1 = pd.MultiIndex.from_arrays(
[np.arange(0, 1000), np.random.choice(["abcd", "def", "gh"], size=(1000,))]
)
assert utils.estimate_pandas_size(idx1) != sys.getsizeof(idx1)


@require_ray
def test_web_serialize_lambda():
register_ray_serializers()
Expand Down
45 changes: 43 additions & 2 deletions mars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,10 @@ def calc_data_size(dt: Any, shape: Tuple[int] = None) -> int:
return sum(calc_data_size(c) for c in dt)

shape = getattr(dt, "shape", None) or shape
if hasattr(dt, "memory_usage") or hasattr(dt, "groupby_obj"):
return sys.getsizeof(dt)
if isinstance(dt, (pd.DataFrame, pd.Series)):
return estimate_pandas_size(dt)
if hasattr(dt, "estimate_size"):
return dt.estimate_size()
if hasattr(dt, "nbytes"):
return max(sys.getsizeof(dt), dt.nbytes)
if hasattr(dt, "shape") and len(dt.shape) == 0:
Expand All @@ -404,6 +406,45 @@ def calc_data_size(dt: Any, shape: Tuple[int] = None) -> int:
return sys.getsizeof(dt)


def estimate_pandas_size(
df_obj, max_samples: int = 10, min_sample_rows: int = 100
) -> int:
if len(df_obj) <= min_sample_rows or isinstance(df_obj, pd.RangeIndex):
return sys.getsizeof(df_obj)

from .dataframe.arrays import ArrowDtype

def _is_fast_dtype(dtype):
if isinstance(dtype, np.dtype):
return np.issubdtype(dtype, np.number)
else:
return isinstance(dtype, ArrowDtype)

dtypes = []
if isinstance(df_obj, pd.DataFrame):
dtypes.extend(df_obj.dtypes)
index_obj = df_obj.index
elif isinstance(df_obj, pd.Series):
dtypes.append(df_obj.dtype)
index_obj = df_obj.index
else:
index_obj = df_obj

# handling possible MultiIndex
if hasattr(index_obj, "dtypes"):
dtypes.extend(index_obj.dtypes)
else:
dtypes.append(index_obj.dtype)

if all(_is_fast_dtype(dtype) for dtype in dtypes):
return sys.getsizeof(df_obj)

indices = np.sort(np.random.choice(len(df_obj), size=max_samples, replace=False))
iloc = df_obj if isinstance(df_obj, pd.Index) else df_obj.iloc
sample_size = sys.getsizeof(iloc[indices])
return sample_size * len(df_obj) // max_samples


def build_fetch_chunk(
chunk: ChunkType, input_chunk_keys: List[str] = None, **kwargs
) -> ChunkType:
Expand Down