diff --git a/mars/core/base.py b/mars/core/base.py index 0a8f716126..9c84bfd1ae 100644 --- a/mars/core/base.py +++ b/mars/core/base.py @@ -15,7 +15,7 @@ from functools import wraps from typing import Dict -from ..serialization.core import Placeholder, short_id +from ..serialization.core import Placeholder, fast_id from ..serialization.serializables import Serializable, StringField from ..serialization.serializables.core import SerializableSerializer from ..utils import tokenize @@ -123,7 +123,7 @@ def buffered_base(func): def wrapped(self, obj: Base, context: Dict): obj_id = (obj.key, obj.id) if obj_id in context: - return Placeholder(short_id(context[obj_id])) + return Placeholder(fast_id(context[obj_id])) else: context[obj_id] = obj return func(self, obj, context) diff --git a/mars/oscar/backends/message.pyx b/mars/oscar/backends/message.pyx index 390009a276..8d56da1703 100644 --- a/mars/oscar/backends/message.pyx +++ b/mars/oscar/backends/message.pyx @@ -515,7 +515,7 @@ class DeserializeMessageFailed(RuntimeError): cdef class MessageSerializer(Serializer): - serializer_id = 56951 + serializer_id = 32105 cpdef serial(self, object obj, dict context): cdef _MessageBase msg = <_MessageBase>obj diff --git a/mars/oscar/batch.py b/mars/oscar/batch.py index e9c4106ecb..18397250b9 100644 --- a/mars/oscar/batch.py +++ b/mars/oscar/batch.py @@ -143,7 +143,9 @@ def _gen_args_kwargs_list(delays): async def _async_batch(self, *delays): # when there is only one call in batch, calling one-pass method # will be more efficient - if len(delays) == 1: + if len(delays) == 0: + return [] + elif len(delays) == 1: d = delays[0] return [await self._async_call(*d.args, **d.kwargs)] elif self.batch_func: @@ -162,7 +164,9 @@ async def _async_batch(self, *delays): return await asyncio.gather(*tasks) def _sync_batch(self, *delays): - if self.batch_func: + if delays == 0: + return [] + elif self.batch_func: args_list, kwargs_list = self._gen_args_kwargs_list(delays) return self.batch_func(args_list, kwargs_list) else: diff --git a/mars/oscar/tests/test_batch.py b/mars/oscar/tests/test_batch.py index 88ca560d39..3abbec354f 100644 --- a/mars/oscar/tests/test_batch.py +++ b/mars/oscar/tests/test_batch.py @@ -146,6 +146,11 @@ def method(self, args_list, kwargs_list): if use_async: assert asyncio.iscoroutinefunction(TestClass.method) + test_inst = TestClass() + ret = test_inst.method.batch() + ret = await ret if use_async else ret + assert ret == [] + test_inst = TestClass() ret = test_inst.method.batch(test_inst.method.delay(12)) ret = await ret if use_async else ret diff --git a/mars/serialization/__init__.py b/mars/serialization/__init__.py index 2d0e52a764..58dd61d2e4 100644 --- a/mars/serialization/__init__.py +++ b/mars/serialization/__init__.py @@ -13,7 +13,7 @@ # limitations under the License. from .aio import AioSerializer, AioDeserializer -from .core import serialize, deserialize, Serializer +from .core import serialize, serialize_with_spawn, deserialize, Serializer from . import arrow, cuda, numpy, scipy, mars_objects, ray, exception diff --git a/mars/serialization/aio.py b/mars/serialization/aio.py index b5340069df..3ef3dfd07b 100644 --- a/mars/serialization/aio.py +++ b/mars/serialization/aio.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import struct from io import BytesIO from typing import Any @@ -20,12 +21,13 @@ import numpy as np from ..utils import lazy_import -from .core import serialize, deserialize +from .core import serialize_with_spawn, deserialize cupy = lazy_import("cupy", globals=globals()) cudf = lazy_import("cudf", globals=globals()) DEFAULT_SERIALIZATION_VERSION = 1 +DEFAULT_SPAWN_THRESHOLD = 100 BUFFER_SIZES_NAME = "buf_sizes" @@ -34,8 +36,10 @@ def __init__(self, obj: Any, compress=0): self._obj = obj self._compress = compress - def _get_buffers(self): - headers, buffers = serialize(self._obj) + async def _get_buffers(self): + headers, buffers = await serialize_with_spawn( + self._obj, spawn_threshold=DEFAULT_SPAWN_THRESHOLD + ) def _is_cuda_buffer(buf): # pragma: no cover if cupy is not None and cudf is not None: @@ -78,7 +82,7 @@ def _is_cuda_buffer(buf): # pragma: no cover return out_buffers async def run(self): - return self._get_buffers() + return await self._get_buffers() MALFORMED_MSG = """\ @@ -123,8 +127,13 @@ async def _get_obj(self): buffer_sizes = header[0].pop(BUFFER_SIZES_NAME) # get buffers buffers = [await self._readexactly(size) for size in buffer_sizes] + # get num of objs + num_objs = header[0].get("_N", 0) - return deserialize(header, buffers) + if num_objs <= DEFAULT_SPAWN_THRESHOLD: + return deserialize(header, buffers) + else: + return await asyncio.to_thread(deserialize, header, buffers) async def run(self): return await self._get_obj() diff --git a/mars/serialization/core.pyi b/mars/serialization/core.pyi index fa5efd9c64..3ee4022615 100644 --- a/mars/serialization/core.pyi +++ b/mars/serialization/core.pyi @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from concurrent.futures import Executor from typing import Any, Callable, Dict, List, Tuple def buffered(func: Callable) -> Callable: ... -def short_id(obj: Any) -> int: ... +def fast_id(obj: Any) -> int: ... class Serializer: serializer_id: int @@ -42,4 +43,10 @@ class Placeholder: def __eq__(self, other): ... def serialize(obj: Any, context: Dict = None): ... +async def serialize_with_spawn( + obj: Any, + context: Dict = None, + spawn_threshold: int = 100, + executor: Executor = None, +): ... def deserialize(headers: List, buffers: List, context: Dict = None): ... diff --git a/mars/serialization/core.pyx b/mars/serialization/core.pyx index b3ae7b3d0d..47d002674c 100644 --- a/mars/serialization/core.pyx +++ b/mars/serialization/core.pyx @@ -1,3 +1,4 @@ +# distutils: language = c++ # Copyright 1999-2022 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,12 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import datetime import inspect import sys from cpython cimport PyObject from functools import partial, wraps -from libc.stdint cimport uint32_t, int64_t, uintptr_t +from libc.stdint cimport int32_t, uint32_t, int64_t, uint64_t, uintptr_t +from libcpp.unordered_map cimport unordered_map from typing import Any, Dict, List import numpy as np @@ -45,6 +48,9 @@ cdef TypeDispatcher _serial_dispatcher = TypeDispatcher() cdef dict _deserializers = dict() cdef uint32_t _MAX_STR_PRIMITIVE_LEN = 1024 +# prime modulus for serializer ids +# use the largest prime number smaller than 32767 +cdef int32_t _SERIALIZER_ID_PRIME = 32749 cdef class Serializer: @@ -136,7 +142,7 @@ cdef class Serializer: @classmethod def calc_default_serializer_id(cls): - return tokenize_int(f"{cls.__module__}.{cls.__qualname__}") & 0x7fffffff + return tokenize_int(f"{cls.__module__}.{cls.__qualname__}") % _SERIALIZER_ID_PRIME @classmethod def register(cls, obj_type): @@ -149,7 +155,10 @@ cdef class Serializer: # inherited serializer_id not acceptable cls.serializer_id = cls.calc_default_serializer_id() _serial_dispatcher.register(obj_type, inst) - _deserializers[cls.serializer_id] = inst + if _deserializers.get(cls.serializer_id) is not None: + assert type(_deserializers[cls.serializer_id]) is cls + else: + _deserializers[cls.serializer_id] = inst @classmethod def unregister(cls, obj_type): @@ -157,14 +166,13 @@ cdef class Serializer: _deserializers.pop(cls.serializer_id, None) -cdef inline uint32_t _short_id(object obj) nogil: - cdef void* ptr = obj - return (ptr) & 0xffffffff +cdef inline uint64_t _fast_id(object obj) nogil: + return obj -def short_id(obj): - """Short representation of id() used for serialization""" - return _short_id(obj) +def fast_id(obj): + """C version of id() used for serialization""" + return _fast_id(obj) def buffered(func): @@ -173,11 +181,11 @@ def buffered(func): """ @wraps(func) def wrapped(self, obj: Any, dict context): - cdef uint32_t short_id = _short_id(obj) - if short_id in context: - return Placeholder(_short_id(obj)) + cdef uint64_t obj_id = _fast_id(obj) + if obj_id in context: + return Placeholder(_fast_id(obj)) else: - context[short_id] = obj + context[obj_id] = obj return func(self, obj, context) return wrapped @@ -223,8 +231,8 @@ cdef class PickleSerializer(Serializer): serializer_id = 0 cpdef serial(self, obj: Any, dict context): - cdef uint32_t obj_id - obj_id = _short_id(obj) + cdef uint64_t obj_id + obj_id = _fast_id(obj) if obj_id in context: return Placeholder(obj_id) context[obj_id] = obj @@ -265,8 +273,8 @@ cdef class BytesSerializer(Serializer): serializer_id = 2 cpdef serial(self, obj: Any, dict context): - cdef uint32_t obj_id - obj_id = _short_id(obj) + cdef uint64_t obj_id + obj_id = _fast_id(obj) if obj_id in context: return Placeholder(obj_id) context[obj_id] = obj @@ -281,8 +289,8 @@ cdef class StrSerializer(Serializer): serializer_id = 3 cpdef serial(self, obj: Any, dict context): - cdef uint32_t obj_id - obj_id = _short_id(obj) + cdef uint64_t obj_id + obj_id = _fast_id(obj) if obj_id in context: return Placeholder(obj_id) context[obj_id] = obj @@ -329,8 +337,8 @@ cdef class CollectionSerializer(Serializer): return (obj_list, idx_to_propagate, obj_type), obj_to_propagate, False cpdef serial(self, obj: Any, dict context): - cdef uint32_t obj_id - obj_id = _short_id(obj) + cdef uint64_t obj_id + obj_id = _fast_id(obj) if obj_id in context: return Placeholder(obj_id) context[obj_id] = obj @@ -401,11 +409,11 @@ cdef class DictSerializer(CollectionSerializer): _inspected_inherits = set() cpdef serial(self, obj: Any, dict context): - cdef uint32_t obj_id + cdef uint64_t obj_id cdef tuple key_obj, value_obj cdef list key_bufs, value_bufs - obj_id = _short_id(obj) + obj_id = _fast_id(obj) if obj_id in context: return Placeholder(obj_id) context[obj_id] = obj @@ -480,10 +488,10 @@ cdef class Placeholder: The object records object identifier and keeps callbacks to replace itself in parent objects. """ - cdef public uint32_t id + cdef public uint64_t id cdef public list callbacks - def __init__(self, uint32_t id_): + def __init__(self, uint64_t id_): self.id = id_ self.callbacks = [] @@ -506,14 +514,10 @@ cdef class PlaceholderSerializer(Serializer): serializer_id = 7 cpdef serial(self, obj: Any, dict context): - return ((obj).id,), [], True + return (), [], True cpdef deserial(self, tuple serialized, dict context, list subs): - obj_id = serialized[0] - try: - return context[obj_id] - except KeyError: - return Placeholder(obj_id) + return Placeholder(0) PickleSerializer.register(object) @@ -538,12 +542,18 @@ cdef class _SerialStackItem: self.subs_serialized = [] +cdef class _IdContextHolder: + cdef unordered_map[uint64_t, uint64_t] d + + cdef int _COMMON_HEADER_LEN = 4 -cdef tuple _serial_single(obj, dict context): +cdef tuple _serial_single( + obj, dict context, _IdContextHolder id_context_holder +): """Serialize single object and return serialized tuples""" - cdef uint32_t obj_id + cdef uint64_t obj_id, ordered_id cdef Serializer serializer cdef tuple common_header, serialized @@ -556,13 +566,16 @@ cdef tuple _serial_single(obj, dict context): if type(obj) is Placeholder: obj_id = (obj).id + ordered_id = id_context_holder.d[obj_id] else: - obj_id = _short_id(obj) + obj_id = _fast_id(obj) + ordered_id = id_context_holder.d.size() + id_context_holder.d[obj_id] = ordered_id # REMEMBER to change _COMMON_HEADER_LEN when content of # this header changed common_header = ( - serializer.serializer_id, obj_id, len(subs), final + serializer.serializer_id, ordered_id, len(subs), final ) break else: @@ -571,40 +584,27 @@ cdef tuple _serial_single(obj, dict context): return common_header + serialized, subs, final -def serialize(obj, dict context = None): - """ - Serialize an object and return a header and buffers. - Buffers are intended for zero-copy data manipulation. +class _SerializeObjectOverflow(Exception): + def __init__(self, tuple cur_serialized, int num_total_serialized): + super(_SerializeObjectOverflow, self).__init__(cur_serialized) + self.cur_serialized = cur_serialized + self.num_total_serialized = num_total_serialized - Parameters - ---------- - obj: Any - Object to serialize - context: - Serialization context for instantiation of Placeholder - objects - Returns - ------- - result: Tuple[Tuple, List] - Picklable header and buffers - """ - cdef list serial_stack = [] +cpdef object _serialize_with_stack( + list serial_stack, + tuple serialized, + dict context, + _IdContextHolder id_context_holder, + list result_bufs_list, + int64_t num_overflow = 0, + int64_t num_total_serialized = 0, +): cdef _SerialStackItem stack_item - cdef list result_bufs_list = [] - cdef tuple serialized cdef list subs cdef bint final - cdef int64_t num_serialized - - context = context if context is not None else dict() - serialized, subs, final = _serial_single(obj, context) - if final or not subs: - # marked as a leaf node, return directly - return ({}, serialized), subs - - serial_stack.append(_SerialStackItem(serialized, subs)) - serialized = None + cdef int64_t num_sub_serialized + cdef bint is_resume = num_total_serialized > 0 while serial_stack: stack_item = serial_stack[-1] @@ -612,17 +612,19 @@ def serialize(obj, dict context = None): # have previously-serialized results, record first stack_item.subs_serialized.append(serialized) - num_serialized = len(stack_item.subs_serialized) - if len(stack_item.subs) == num_serialized: + num_sub_serialized = len(stack_item.subs_serialized) + if len(stack_item.subs) == num_sub_serialized: # all subcomponents serialized, serialization of current is done # and we can move to the parent object serialized = stack_item.serialized + tuple(stack_item.subs_serialized) + num_total_serialized += 1 serial_stack.pop() else: # serialize next subcomponent at stack top serialized, subs, final = _serial_single( - stack_item.subs[num_serialized], context + stack_item.subs[num_sub_serialized], context, id_context_holder ) + num_total_serialized += 1 if final or not subs: # the subcomponent is a leaf if subs: @@ -635,8 +637,112 @@ def serialize(obj, dict context = None): # note that the serialized header should not be recorded # as we are now processing the subcomponent itself serialized = None + if 0 < num_overflow < num_total_serialized: + raise _SerializeObjectOverflow(serialized, num_total_serialized) + # we keep an empty dict for extra metas required for other modules - return ({}, serialized), result_bufs_list + if is_resume: + # returns num of deserialized objects when resumed + extra_meta = {"_N": num_total_serialized} + else: + # otherwise does not record the number to reduce result size + extra_meta = {} + return (extra_meta, serialized), result_bufs_list + + +def serialize(obj, dict context = None): + """ + Serialize an object and return a header and buffers. + Buffers are intended for zero-copy data manipulation. + + Parameters + ---------- + obj: Any + Object to serialize + context: + Serialization context for instantiation of Placeholder + objects + + Returns + ------- + result: Tuple[Tuple, List] + Picklable header and buffers + """ + cdef list serial_stack = [] + cdef list result_bufs_list = [] + cdef tuple serialized + cdef list subs + cdef bint final + cdef _IdContextHolder id_context_holder = _IdContextHolder() + + context = context if context is not None else dict() + serialized, subs, final = _serial_single(obj, context, id_context_holder) + if final or not subs: + # marked as a leaf node, return directly + return ({}, serialized), subs + + serial_stack.append(_SerialStackItem(serialized, subs)) + return _serialize_with_stack( + serial_stack, None, context, id_context_holder, result_bufs_list + ) + + +async def serialize_with_spawn( + obj, dict context = None, int spawn_threshold = 100, object executor = None +): + """ + Serialize an object and return a header and buffers. + Buffers are intended for zero-copy data manipulation. + + Parameters + ---------- + obj: Any + Object to serialize + context: Dict + Serialization context for instantiation of Placeholder + objects + spawn_threshold: int + Threshold to spawn into a ThreadPoolExecutor + executor: ThreadPoolExecutor + ThreadPoolExecutor to spawn rest serialization into + + Returns + ------- + result: Tuple[Tuple, List] + Picklable header and buffers + """ + cdef list serial_stack = [] + cdef list result_bufs_list = [] + cdef tuple serialized + cdef list subs + cdef bint final + cdef _IdContextHolder id_context_holder = _IdContextHolder() + + context = context if context is not None else dict() + serialized, subs, final = _serial_single(obj, context, id_context_holder) + if final or not subs: + # marked as a leaf node, return directly + return ({}, serialized), subs + + serial_stack.append(_SerialStackItem(serialized, subs)) + + try: + result = _serialize_with_stack( + serial_stack, None, context, id_context_holder, result_bufs_list, spawn_threshold + ) + except _SerializeObjectOverflow as ex: + result = await asyncio.get_running_loop().run_in_executor( + executor, + _serialize_with_stack, + serial_stack, + ex.cur_serialized, + context, + id_context_holder, + result_bufs_list, + 0, + ex.num_total_serialized, + ) + return result cdef class _DeserialStackItem: @@ -659,6 +765,12 @@ cdef _deserial_single(tuple serialized, dict context, list subs): serializer = _deserializers[serializer_id] res = serializer.deserial(serialized[_COMMON_HEADER_LEN:], context, subs) + if type(res) is Placeholder: + try: + res = context[obj_id] + except KeyError: + (res).id = obj_id + # get previously-recorded context values context_val, context[obj_id] = context.get(obj_id), res # if previously recorded object is a Placeholder, diff --git a/mars/serialization/tests/test_serial.py b/mars/serialization/tests/test_serial.py index 51c3bd4716..d36fe90a6c 100644 --- a/mars/serialization/tests/test_serial.py +++ b/mars/serialization/tests/test_serial.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading from collections import defaultdict, OrderedDict from typing import Any, Dict, List, Tuple @@ -31,7 +32,7 @@ from ...lib.sparse import SparseMatrix from ...tests.core import require_cupy, require_cudf from ...utils import lazy_import -from .. import serialize, deserialize +from .. import serialize, serialize_with_spawn, deserialize from ..core import Placeholder, ListSerializer cupy = lazy_import("cupy", globals=globals()) @@ -219,8 +220,8 @@ def test_mars_sparse(): assert (val.spmatrix != deserial.spmatrix).nnz == 0 -class MockSerializer(ListSerializer): - serializer_id = 145921 +class MockSerializerForErrors(ListSerializer): + serializer_id = 25951 raises = False def on_deserial_error( @@ -255,8 +256,8 @@ def __setstate__(self, state): def test_deserial_errors(): try: - MockSerializer.raises = False - MockSerializer.register(CustomList) + MockSerializerForErrors.raises = False + MockSerializerForErrors.register(CustomList) # error of leaf object is raised obj = [1, [[3, UnpickleWithError()]]] @@ -269,7 +270,7 @@ def test_deserial_errors(): deserialize(*serialize(obj)) assert isinstance(exc_info.value.__cause__, ValueError) - MockSerializer.raises = True + MockSerializerForErrors.raises = True # error of non-leaf object is raised obj = [CustomList([[1], [[2]]])] @@ -282,4 +283,32 @@ def test_deserial_errors(): deserialize(*serialize(obj)) assert isinstance(exc_info.value.__cause__, TypeError) finally: - MockSerializer.unregister(CustomList) + MockSerializerForErrors.unregister(CustomList) + + +class MockSerializerForSpawn(ListSerializer): + thread_calls = defaultdict(lambda: 0) + + def serial(self, obj: Any, context: Dict): + self.thread_calls[threading.current_thread().ident] += 1 + return super().serial(obj, context) + + +@pytest.mark.asyncio +async def test_spawn_threshold(): + try: + assert 0 == deserialize(*(await serialize_with_spawn(0))) + + MockSerializerForSpawn.register(CustomList) + obj = [CustomList([i]) for i in range(200)] + serialized = await serialize_with_spawn(obj, spawn_threshold=100) + assert serialized[0][0]["_N"] == 201 + deserialized = deserialize(*serialized) + for s, d in zip(obj, deserialized): + assert s[0] == d[0] + + calls = MockSerializerForSpawn.thread_calls + assert sum(calls.values()) == 200 + assert calls[threading.current_thread().ident] == 101 + finally: + MockSerializerForSpawn.unregister(CustomList) diff --git a/setup.cfg b/setup.cfg index 90e006078b..4826501bec 100644 --- a/setup.cfg +++ b/setup.cfg @@ -200,4 +200,4 @@ exclude = [codespell] ignore-words-list = hist,rcall,fpr,ser,nd,inout,ot,Ba,ba,asend,hart,coo,splitted,datas,fro -skip = .idea,.git,./build,./docs/build,./mars/lib,node_modules,static,generated,*.po,*.ts,*.json,*.c,*.cfg +skip = .idea,.git,./build,./docs/build,./mars/lib,node_modules,static,generated,*.po,*.ts,*.json,*.c,*.cpp,*.cfg