From fe007d36cf5959db7f74a7c287509d5bc9de9c19 Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Thu, 14 Apr 2022 19:54:17 +0800 Subject: [PATCH 1/3] INITIAL pickle serial --- .github/workflows/run-tests.sh | 5 +- .gitignore | 1 + asv_bench/benchmarks/serialize.py | 39 +- mars/_utils.pxd | 12 + mars/_utils.pyx | 4 - mars/core/base.py | 15 +- mars/core/entity/core.py | 2 +- mars/core/entity/tileables.py | 2 - mars/core/graph/entity.py | 16 +- mars/core/operand/base.py | 10 +- mars/core/operand/fetch.py | 24 +- mars/oscar/backends/communication/utils.py | 4 +- mars/oscar/backends/message.py | 30 +- mars/serialization/__init__.py | 2 +- mars/serialization/aio.py | 10 +- mars/serialization/arrow.py | 18 +- mars/serialization/core.py | 479 ----------------- mars/serialization/core.pyi | 37 ++ mars/serialization/core.pyx | 585 +++++++++++++++++++++ mars/serialization/cuda.py | 22 +- mars/serialization/exception.py | 10 +- mars/serialization/mars_objects.py | 19 +- mars/serialization/numpy.py | 17 +- mars/serialization/ray.py | 18 +- mars/serialization/scipy.py | 50 +- mars/serialization/serializables/core.py | 98 ++-- mars/utils.py | 16 +- 27 files changed, 821 insertions(+), 724 deletions(-) delete mode 100644 mars/serialization/core.py create mode 100644 mars/serialization/core.pyi create mode 100644 mars/serialization/core.pyx diff --git a/.github/workflows/run-tests.sh b/.github/workflows/run-tests.sh index bc870a0b88..4fb6160626 100755 --- a/.github/workflows/run-tests.sh +++ b/.github/workflows/run-tests.sh @@ -4,7 +4,10 @@ if [ -n "$WITH_CYTHON" ]; then mkdir -p build export POOL_START_METHOD=forkserver - coverage run --rcfile=setup.cfg -m pytest $PYTEST_CONFIG_WITHOUT_COV mars/tests mars/core/graph + coverage run --rcfile=setup.cfg -m pytest $PYTEST_CONFIG_WITHOUT_COV \ + mars/tests \ + mars/core/graph \ + mars/serialization python .github/workflows/remove_tracer_errors.py coverage combine mv .coverage build/.coverage.non-oscar.file diff --git a/.gitignore b/.gitignore index c6486c01de..160a9333f5 100644 --- a/.gitignore +++ b/.gitignore @@ -86,6 +86,7 @@ mars/learn/cluster/*.c* mars/learn/utils/*.c* mars/lib/*.c* mars/oscar/**/*.c* +mars/serialization/*.c* # web bundle file mars/services/web/static diff --git a/asv_bench/benchmarks/serialize.py b/asv_bench/benchmarks/serialize.py index b969bf49e8..0e09a49da0 100644 --- a/asv_bench/benchmarks/serialize.py +++ b/asv_bench/benchmarks/serialize.py @@ -72,7 +72,7 @@ class MySerializable(Serializable): _dict_val = DictField("dict_val", FieldTypes.string, FieldTypes.bytes) -class SerializationSuite: +class SerializeSerializableSuite: def setup(self): children = [] for idx in range(1000): @@ -90,6 +90,12 @@ def setup(self): children.append(child) self.test_data = SerializableParent(children=children) + def time_serialize_deserialize(self): + deserialize(*serialize(self.test_data)) + + +class SerializeSubtaskSuite: + def setup(self): self.subtasks = [] for i in range(10000): subtask = Subtask( @@ -110,7 +116,13 @@ def setup(self): ) self.subtasks.append(subtask) - self.test_basic_serializable = [] + def time_pickle_serialize_deserialize_subtask(self): + deserialize(*cloudpickle.loads(cloudpickle.dumps(serialize(self.subtasks)))) + + +class SerializePrimitivesSuite: + def setup(self): + self.test_primitive_serializable = [] for i in range(10000): my_serializable = MySerializable( _bool_val=True, @@ -129,27 +141,24 @@ def setup(self): _tuple_val=("a", "b"), _dict_val={"a": b"bytes_value"}, ) - self.test_basic_serializable.append(my_serializable) - - self.test_list = list(range(100000)) - self.test_tuple = tuple(range(100000)) - self.test_dict = {i: i for i in range(100000)} - - def time_serialize_deserialize(self): - deserialize(*serialize(self.test_data)) + self.test_primitive_serializable.append(my_serializable) - def time_serialize_deserialize_basic(self): - deserialize(*serialize(self.test_basic_serializable)) + def time_serialize_deserialize_primitive(self): + deserialize(*serialize(self.test_primitive_serializable)) def time_pickle_serialize_deserialize_basic(self): deserialize( *cloudpickle.loads( - cloudpickle.dumps(serialize(self.test_basic_serializable)) + cloudpickle.dumps(serialize(self.test_primitive_serializable)) ) ) - def time_pickle_serialize_deserialize_subtask(self): - deserialize(*cloudpickle.loads(cloudpickle.dumps(serialize(self.subtasks)))) + +class SerializeContainersSuite: + def setup(self): + self.test_list = list(range(100000)) + self.test_tuple = tuple(range(100000)) + self.test_dict = {i: i for i in range(100000)} def time_pickle_serialize_deserialize_list(self): deserialize(*cloudpickle.loads(cloudpickle.dumps(serialize(self.test_list)))) diff --git a/mars/_utils.pxd b/mars/_utils.pxd index df9745f25a..a36ea0b63f 100644 --- a/mars/_utils.pxd +++ b/mars/_utils.pxd @@ -12,6 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. + +cdef class TypeDispatcher: + cdef dict _handlers + cdef dict _lazy_handlers + cdef dict _inherit_handlers + + cpdef void register(self, object type_, object handler) + cpdef void unregister(self, object type_) + cdef _reload_lazy_handlers(self) + cpdef get_handler(self, object type_) + + cpdef str to_str(s, encoding=*) cpdef bytes to_binary(s, encoding=*) cpdef unicode to_text(s, encoding=*) diff --git a/mars/_utils.pyx b/mars/_utils.pyx index e9e182afe4..40396ba7fc 100644 --- a/mars/_utils.pyx +++ b/mars/_utils.pyx @@ -84,10 +84,6 @@ cpdef unicode to_text(s, encoding='utf-8'): cdef class TypeDispatcher: - cdef dict _handlers - cdef dict _lazy_handlers - cdef dict _inherit_handlers - def __init__(self): self._handlers = dict() self._lazy_handlers = dict() diff --git a/mars/core/base.py b/mars/core/base.py index 2fa5697a73..0a8f716126 100644 --- a/mars/core/base.py +++ b/mars/core/base.py @@ -15,6 +15,7 @@ from functools import wraps from typing import Dict +from ..serialization.core import Placeholder, short_id from ..serialization.serializables import Serializable, StringField from ..serialization.serializables.core import SerializableSerializer from ..utils import tokenize @@ -117,16 +118,12 @@ def id(self): return self._id -def buffered(func): +def buffered_base(func): @wraps(func) def wrapped(self, obj: Base, context: Dict): obj_id = (obj.key, obj.id) if obj_id in context: - return { - "id": id(context[obj_id]), - "serializer": "ref", - "buf_num": 0, - }, [] + return Placeholder(short_id(context[obj_id])) else: context[obj_id] = obj return func(self, obj, context) @@ -135,9 +132,9 @@ def wrapped(self, obj: Base, context: Dict): class BaseSerializer(SerializableSerializer): - @buffered - def serialize(self, obj: Serializable, context: Dict): - return (yield from super().serialize(obj, context)) + @buffered_base + def serial(self, obj: Base, context: Dict): + return super().serial(obj, context) BaseSerializer.register(Base) diff --git a/mars/core/entity/core.py b/mars/core/entity/core.py index 1187a53991..6a27ac65d2 100644 --- a/mars/core/entity/core.py +++ b/mars/core/entity/core.py @@ -23,7 +23,7 @@ class EntityData(Base): - __slots__ = "__weakref__", "_siblings" + __slots__ = ("_siblings",) type_name = None # required fields diff --git a/mars/core/entity/tileables.py b/mars/core/entity/tileables.py index adafdbad73..b98a1f17d9 100644 --- a/mars/core/entity/tileables.py +++ b/mars/core/entity/tileables.py @@ -345,8 +345,6 @@ def detach(self, entity): class Tileable(Entity): - __slots__ = ("__weakref__",) - def __init__(self, data: TileableType = None, **kw): super().__init__(data=data, **kw) data = self._data diff --git a/mars/core/graph/entity.py b/mars/core/graph/entity.py index 92b541ea95..2301996785 100644 --- a/mars/core/graph/entity.py +++ b/mars/core/graph/entity.py @@ -13,7 +13,7 @@ # limitations under the License. from abc import ABCMeta, abstractmethod -from typing import List, Dict, Union, Iterable +from typing import List, Dict, Tuple, Union, Iterable from ...core import Tileable, Chunk from ...serialization.core import buffered @@ -133,19 +133,15 @@ def to_graph(self) -> Union[TileableGraph, ChunkGraph]: class GraphSerializer(SerializableSerializer): - serializer_name = "graph" - @buffered - def serialize(self, obj: Union[TileableGraph, ChunkGraph], context: Dict): + def serial(self, obj: Union[TileableGraph, ChunkGraph], context: Dict): serializable_graph = SerializableGraph.from_graph(obj) - return (yield from super().serialize(serializable_graph, context)) + return (), [serializable_graph], False - def deserialize( - self, header: Dict, buffers: List, context: Dict + def deserial( + self, serialized: Tuple, context: Dict, subs: List ) -> Union[TileableGraph, ChunkGraph]: - serializable_graph: SerializableGraph = ( - yield from super().deserialize(header, buffers, context) - ) + serializable_graph: SerializableGraph = subs[0] return serializable_graph.to_graph() diff --git a/mars/core/operand/base.py b/mars/core/operand/base.py index 4c4ee52ef3..cbaed9d903 100644 --- a/mars/core/operand/base.py +++ b/mars/core/operand/base.py @@ -161,11 +161,11 @@ class Operand(Base, OperatorLogicKeyGeneratorMixin, metaclass=OperandMetaclass): which should be the :class:`mars.tensor.core.TensorData`, :class:`mars.tensor.core.ChunkData` etc. """ - __slots__ = ("__weakref__",) attr_tag = "attr" _init_update_key_ = False _output_type_ = None _no_copy_attrs_ = Base._no_copy_attrs_ | {"scheduling_hint"} + _cache_primitive_serial = True sparse = BoolField("sparse", default=False) device = Int32Field("device", default=None) @@ -328,11 +328,13 @@ def on_input_modify(self, new_input): class OperandSerializer(SerializableSerializer): - serializer_name = "operand" + def serial(self, obj: Serializable, context: Dict): + res = super().serial(obj, context) + return res - def deserialize(self, header: Dict, buffers: List, context: Dict) -> Operand: + def deserial(self, serialized: Tuple, context: Dict, subs: List) -> Operand: # convert outputs back to weak-refs - operand: Operand = (yield from super().deserialize(header, buffers, context)) + operand: Operand = super().deserial(serialized, context, subs) for i, out in enumerate(operand._outputs): def cb(o, index): diff --git a/mars/core/operand/fetch.py b/mars/core/operand/fetch.py index e65a8b698b..f8e83bd153 100644 --- a/mars/core/operand/fetch.py +++ b/mars/core/operand/fetch.py @@ -12,10 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import cloudpickle - from ... import opcodes -from ...serialization.core import cached_pickle_dumps from ...serialization.serializables import FieldTypes, StringField, ListField from .base import Operand from .core import TileableOperandMixin @@ -47,21 +44,6 @@ def execute(cls, ctx, op): class FetchShuffle(Operand): _op_type_ = opcodes.FETCH_SHUFFLE - source_keys = ListField( - "source_keys", - FieldTypes.string, - on_serialize=cached_pickle_dumps, - on_deserialize=cloudpickle.loads, - ) - source_idxes = ListField( - "source_idxes", - FieldTypes.tuple(FieldTypes.uint64), - on_serialize=cached_pickle_dumps, - on_deserialize=cloudpickle.loads, - ) - source_mappers = ListField( - "source_mappers", - FieldTypes.uint16, - on_serialize=cached_pickle_dumps, - on_deserialize=cloudpickle.loads, - ) + source_keys = ListField("source_keys", FieldTypes.string) + source_idxes = ListField("source_idxes", FieldTypes.tuple(FieldTypes.uint64)) + source_mappers = ListField("source_mappers", FieldTypes.uint16) diff --git a/mars/oscar/backends/communication/utils.py b/mars/oscar/backends/communication/utils.py index e6e2369aa9..13bb94b25a 100644 --- a/mars/oscar/backends/communication/utils.py +++ b/mars/oscar/backends/communication/utils.py @@ -68,8 +68,8 @@ async def read_buffers(header: Dict, reader: StreamReader): CPBuffer = CPUnownedMemory = CPMemoryPointer = None # construct a empty cuda buffer and copy from host - is_cuda_buffers = header.get("is_cuda_buffers") - buffer_sizes = header.pop(BUFFER_SIZES_NAME) + is_cuda_buffers = header[0].get("is_cuda_buffers") + buffer_sizes = header[0].pop(BUFFER_SIZES_NAME) buffers = [] for is_cuda_buffer, buf_size in zip(is_cuda_buffers, buffer_sizes): diff --git a/mars/oscar/backends/message.py b/mars/oscar/backends/message.py index 14e7c7fd56..f97a2ee569 100644 --- a/mars/oscar/backends/message.py +++ b/mars/oscar/backends/message.py @@ -384,32 +384,20 @@ def __str__(self): class MessageSerializer(Serializer): - serializer_name = "actor_message" - @buffered - def serialize(self, obj: _MessageBase, context: Dict): + def serial(self, obj: _MessageBase, context: Dict): assert obj.protocol == 0, "only support protocol 0 for now" - message_class = type(obj) - to_serialize = [getattr(obj, slot) for slot in _get_slots(message_class)] - header, buffers = yield to_serialize - new_header = { - "message_class": message_class, - "message_id": obj.message_id, - "protocol": obj.protocol, - "attributes_header": header, - } - return new_header, buffers - - def deserialize(self, header: Dict, buffers: List, context: Dict): - protocol = header["protocol"] + message_cls = type(obj) + to_serialize = [getattr(obj, slot) for slot in _get_slots(message_cls)] + return (message_cls, obj.message_id, obj.protocol), [to_serialize], False + + def deserial(self, serialized: Tuple, context: Dict, subs: List): + message_cls, message_id, protocol = serialized assert protocol == 0, "only support protocol 0 for now" - message_id = header["message_id"] - message_class = header["message_class"] try: - serialized = yield header["attributes_header"], buffers - message = object.__new__(message_class) - for slot, val in zip(_get_slots(message_class), serialized): + message = object.__new__(message_cls) + for slot, val in zip(_get_slots(message_cls), subs[0]): setattr(message, slot, val) return message except pickle.UnpicklingError as e: # pragma: no cover diff --git a/mars/serialization/__init__.py b/mars/serialization/__init__.py index 60b415f74d..2d0e52a764 100644 --- a/mars/serialization/__init__.py +++ b/mars/serialization/__init__.py @@ -13,7 +13,7 @@ # limitations under the License. from .aio import AioSerializer, AioDeserializer -from .core import serialize, deserialize +from .core import serialize, deserialize, Serializer from . import arrow, cuda, numpy, scipy, mars_objects, ray, exception diff --git a/mars/serialization/aio.py b/mars/serialization/aio.py index c417a98c80..cc57af8472 100644 --- a/mars/serialization/aio.py +++ b/mars/serialization/aio.py @@ -24,7 +24,7 @@ cupy = lazy_import("cupy", globals=globals()) cudf = lazy_import("cudf", globals=globals()) -DEFAULT_SERIALIZATION_VERSION = 0 +DEFAULT_SERIALIZATION_VERSION = 1 BUFFER_SIZES_NAME = "buf_sizes" @@ -51,10 +51,10 @@ def _is_cuda_buffer(buf): # pragma: no cover return False is_cuda_buffers = [_is_cuda_buffer(buf) for buf in buffers] - headers["is_cuda_buffers"] = np.array(is_cuda_buffers) + headers[0]["is_cuda_buffers"] = np.array(is_cuda_buffers) # add buffer lengths into headers - headers[BUFFER_SIZES_NAME] = [ + headers[0][BUFFER_SIZES_NAME] = [ getattr(buf, "nbytes", len(buf)) for buf in buffers ] header = pickle.dumps(headers) @@ -113,7 +113,7 @@ async def _get_obj_header_bytes(self): async def _get_obj(self): header = pickle.loads(await self._get_obj_header_bytes()) # get buffer size - buffer_sizes = header.pop(BUFFER_SIZES_NAME) + buffer_sizes = header[0].pop(BUFFER_SIZES_NAME) # get buffers buffers = [await self._readexactly(size) for size in buffer_sizes] @@ -127,7 +127,7 @@ async def get_size(self): header_bytes = await self._get_obj_header_bytes() header = pickle.loads(header_bytes) # get buffer size - buffer_sizes = header.pop(BUFFER_SIZES_NAME) + buffer_sizes = header[0].pop(BUFFER_SIZES_NAME) return 11 + len(header_bytes) + sum(buffer_sizes) async def get_header(self): diff --git a/mars/serialization/arrow.py b/mars/serialization/arrow.py index c983bc32b1..c2ac86c55b 100644 --- a/mars/serialization/arrow.py +++ b/mars/serialization/arrow.py @@ -26,29 +26,25 @@ class ArrowBatchSerializer(Serializer): - serializer_name = "arrow" - @buffered - def serialize(self, obj: pa_types, context: Dict): - header = {} - + def serial(self, obj: pa_types, context: Dict): sink = pa.BufferOutputStream() writer = pa.RecordBatchStreamWriter(sink, obj.schema) if isinstance(obj, pa.Table): - header["type"] = "Table" + batch_type = "T" writer.write_table(obj) else: - header["type"] = "Batch" + batch_type = "B" writer.write_batch(obj) writer.close() buf = sink.getvalue() buffers = [buf] - return header, buffers + return (batch_type,), buffers, True - def deserialize(self, header: Dict, buffers: List, context: Dict): - reader = pa.RecordBatchStreamReader(pa.BufferReader(buffers[0])) - if header["type"] == "Table": + def deserial(self, serialized: Dict, context: Dict, subs: List): + reader = pa.RecordBatchStreamReader(pa.BufferReader(subs[0])) + if serialized[0] == "T": return reader.read_all() else: return reader.read_next_batch() diff --git a/mars/serialization/core.py b/mars/serialization/core.py deleted file mode 100644 index e94d5f78d3..0000000000 --- a/mars/serialization/core.py +++ /dev/null @@ -1,479 +0,0 @@ -# Copyright 1999-2021 Alibaba Group Holding Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import datetime -import inspect -import sys -import types -from functools import partial, wraps -from typing import Any, Dict, List -from weakref import WeakKeyDictionary - -import pandas as pd - -from ..utils import TypeDispatcher - -import cloudpickle - -if sys.version_info[:2] < (3, 8): - try: - import pickle5 as pickle # nosec # pylint: disable=import_pickle - except ImportError: - import pickle # nosec # pylint: disable=import_pickle -else: - import pickle # nosec # pylint: disable=import_pickle - -HAS_PICKLE_BUFFER = pickle.HIGHEST_PROTOCOL >= 5 -BUFFER_PICKLE_PROTOCOL = max(pickle.DEFAULT_PROTOCOL, 5) -_PANDAS_HAS_MGR = hasattr(pd.Series([0]), "_mgr") - -_serial_dispatcher = TypeDispatcher() -_deserializers = dict() - - -class Serializer: - serializer_name = None - - def serialize(self, obj: Any, context: Dict): - raise NotImplementedError - - def deserialize(self, header: Dict, buffers: List, context: Dict): - raise NotImplementedError - - @classmethod - def register(cls, obj_type): - inst = cls() - _serial_dispatcher.register(obj_type, inst) - _deserializers[cls.serializer_name] = inst - - @classmethod - def unregister(cls, obj_type): - _serial_dispatcher.unregister(obj_type) - _deserializers.pop(cls.serializer_name, None) - - -def buffered(func): - @wraps(func) - def wrapped(self, obj: Any, context: Dict): - if id(obj) in context: - return {"id": id(obj), "serializer": "ref"}, [] - else: - context[id(obj)] = obj - return func(self, obj, context) - - return wrapped - - -def is_basic_type(o): - # Avoid isinstance since it's pretty time consuming - return type(o) in _basic_types - - -_basic_types = { - str, - int, - float, - complex, - datetime.datetime, - datetime.date, - datetime.timedelta, - type(max), -} - - -def pickle_buffers(obj): - buffers = [None] - if HAS_PICKLE_BUFFER: - - def buffer_cb(x): - x = x.raw() - if x.ndim > 1: - # ravel n-d memoryview - x = x.cast(x.format) - buffers.append(memoryview(x)) - - buffers[0] = cloudpickle.dumps( - obj, - buffer_callback=buffer_cb, - protocol=BUFFER_PICKLE_PROTOCOL, - ) - else: # pragma: no cover - buffers[0] = cloudpickle.dumps(obj) - return buffers - - -def unpickle_buffers(buffers): - result = cloudpickle.loads(buffers[0], buffers=buffers[1:]) - - # as pandas prior to 1.1.0 use _data instead of _mgr to hold BlockManager, - # deserializing from high versions may produce mal-functioned pandas objects, - # thus the patch is needed - if _PANDAS_HAS_MGR: - return result - else: # pragma: no cover - if hasattr(result, "_mgr") and isinstance(result, (pd.DataFrame, pd.Series)): - result._data = getattr(result, "_mgr") - delattr(result, "_mgr") - return result - - -class ScalarSerializer(Serializer): - serializer_name = "scalar" - - def serialize(self, obj: Any, context: Dict): - header = {"val": obj} - return header, [] - - def deserialize(self, header: Dict, buffers: List, context: Dict): - return header["val"] - - -class StrSerializer(Serializer): - serializer_name = "str" - - @buffered - def serialize(self, obj, context: Dict): - bytes_data = obj.encode() - return {}, [bytes_data] - - def deserialize(self, header: Dict, buffers: List, context: Dict): - buffer = buffers[0] - if type(buffer) is memoryview: - buffer = buffer.tobytes() - return buffer.decode() - - -class BytesSerializer(Serializer): - serializer_name = "bytes" - - @buffered - def serialize(self, obj, context: Dict): - return {}, [obj] - - def deserialize(self, header: Dict, buffers: List, context: Dict): - return buffers[0] - - -class PickleSerializer(Serializer): - serializer_name = "pickle" - - @buffered - def serialize(self, obj, context: Dict): - return {}, pickle_buffers(obj) - - def deserialize(self, header: Dict, buffers: List, context: Dict): - return unpickle_buffers(buffers) - - -class CollectionSerializer(Serializer): - obj_type = None - - @staticmethod - def _serialize(c): - headers = [None] * len(c) - buffers_list = [None] * len(c) - for idx, obj in enumerate(c): - if is_basic_type(obj): - header, buffers = obj, [] - else: - header, buffers = yield obj - headers[idx] = header - buffers_list[idx] = buffers - return headers, buffers_list - - @buffered - def serialize(self, obj: Any, context: Dict): - buffers = [] - headers_list, buffers_list = yield from self._serialize(obj) - for b in buffers_list: - buffers.extend(b) - headers = {"headers": headers_list} - if type(obj) is not self.obj_type: - headers["obj_type"] = type(obj) - return headers, buffers - - @staticmethod - def _list_deserial(headers: Dict, buffers: List): - pos = 0 - ret = [None] * len(headers) - for idx, sub_header in enumerate(headers): - if type(sub_header) is dict: - buf_num = sub_header.get("buf_num", 0) - sub_buffers = buffers[pos : pos + buf_num] - ret[idx] = yield sub_header, sub_buffers - pos += buf_num - else: - ret[idx] = sub_header - return ret - - def deserialize(self, header: Dict, buffers: List, context: Dict): - obj_type = self.obj_type - if "obj_type" in header: - obj_type = header["obj_type"] - if hasattr(obj_type, "_fields"): - # namedtuple - return obj_type( - *(yield from self._list_deserial(header["headers"], buffers)) - ) - else: - return obj_type( - (yield from self._list_deserial(header["headers"], buffers)) - ) - - -class ListSerializer(CollectionSerializer): - serializer_name = "list" - obj_type = list - - def deserialize(self, header: Dict, buffers: List, context: Dict): - ret = yield from super().deserialize(header, buffers, context) - for idx, v in enumerate(ret): - if type(v) is Placeholder: - v.callbacks.append(partial(ret.__setitem__, idx)) - return ret - - -class TupleSerializer(CollectionSerializer): - serializer_name = "tuple" - obj_type = tuple - - def deserialize(self, header: Dict, buffers: List, context: Dict): - ret = yield from super().deserialize(header, buffers, context) - assert not any(type(v) is Placeholder for v in ret) - return ret - - -class DictSerializer(CollectionSerializer): - serializer_name = "dict" - _inspected_inherits = set() - - @buffered - def serialize(self, obj: Dict, context: Dict): - obj_type = type(obj) - if obj_type is not dict and obj_type not in self._inspected_inherits: - inspect_init = inspect.getfullargspec(obj_type.__init__) - if ( - inspect_init.args == ["self"] - and not inspect_init.varargs - and not inspect_init.varkw - ): - # dict inheritance - # remove context to generate real serialized result - context.pop(id(obj)) - PickleSerializer.register(obj_type) - return (yield obj) - else: - self._inspected_inherits.add(obj_type) - - key_headers, key_buffers_list = yield from self._serialize(obj.keys()) - value_headers, value_buffers_list = yield from self._serialize(obj.values()) - - buffers = [] - for b in key_buffers_list: - buffers.extend(b) - key_buf_num = len(buffers) - - for b in value_buffers_list: - buffers.extend(b) - - header = { - "key_headers": key_headers, - "key_buf_num": key_buf_num, - "value_headers": value_headers, - } - if type(obj) is not dict: - header["obj_type"] = pickle.dumps(type(obj)) - - return header, buffers - - def deserialize(self, header: Dict, buffers: List, context: Dict): - key_buffers = buffers[: header["key_buf_num"]] - value_buffers = buffers[header["key_buf_num"] :] - - obj_type = dict - if "obj_type" in header: - obj_type = pickle.loads(header["obj_type"]) - - keys = yield from self._list_deserial(header["key_headers"], key_buffers) - values = yield from self._list_deserial(header["value_headers"], value_buffers) - - def _key_replacer(key, real_key): - ret[real_key] = ret.pop(key) - - def _value_replacer(key, real_value): - if type(key) is Placeholder: - key = context[key.id] - ret[key] = real_value - - try: - ret = obj_type(zip(keys, values)) - except TypeError: - # defaultdict - ret = obj_type() - ret.update(zip(keys, values)) - for k, v in zip(keys, values): - if type(k) is Placeholder: - k.callbacks.append(partial(_key_replacer, k)) - if type(v) is Placeholder: - v.callbacks.append(partial(_value_replacer, k)) - return ret - - -_cached_pickle_serialized = WeakKeyDictionary() - - -def cached_pickle_dumps(obj): - serialized = _cached_pickle_serialized.get(obj) - if serialized is None: - _cached_pickle_serialized[obj] = serialized = cloudpickle.dumps(obj) - return serialized - - -PickleSerializer.register(object) -ScalarSerializer.register(bool) -ScalarSerializer.register(int) -ScalarSerializer.register(float) -ScalarSerializer.register(complex) -ScalarSerializer.register(type(None)) -BytesSerializer.register(bytes) -StrSerializer.register(str) -ListSerializer.register(list) -TupleSerializer.register(tuple) -DictSerializer.register(dict) - - -class Placeholder: - id: int - callbacks: List - - def __init__(self, id_: int): - self.id = id_ - self.callbacks = [] - - def __hash__(self): - return hash(self.id) - - def __eq__(self, other): # pragma: no cover - if type(other) is not Placeholder: - return False - return self.id == other.id - - -def serialize(obj, context: Dict = None): - def _wrap_headers(_obj, _serializer_name, _header, _buffers): - if _header.get("serializer") == "ref": - return _header, _buffers - # if serializer already defined, do not change - _header["serializer"] = _header.get("serializer", _serializer_name) - if len(_buffers) > 0: - _header["buf_num"] = len(_buffers) - _header["id"] = id(_obj) - return _header, _buffers - - context = context if context is not None else dict() - - serializer = _serial_dispatcher.get_handler(type(obj)) - result = serializer.serialize(obj, context) - - if not isinstance(result, types.GeneratorType): - # result is not a generator, return directly - header, buffers = result - return _wrap_headers(obj, serializer.serializer_name, header, buffers) - else: - # result is a generator, iter it till final result - gen_stack = [(result, obj, serializer.serializer_name)] - last_serial = None - while gen_stack: - gen, call_obj, ser_name = gen_stack[-1] - try: - gen_to_serial = gen.send(last_serial) - gen_serializer = _serial_dispatcher.get_handler(type(gen_to_serial)) - gen_result = gen_serializer.serialize(gen_to_serial, context) - if isinstance(gen_result, types.GeneratorType): - # when intermediate result still generator, push its contexts - # into stack and handle it first - gen_stack.append( - (gen_result, gen_to_serial, gen_serializer.serializer_name) - ) - # result need to be emptied to run the generator - last_serial = None - else: - # when intermediate result is not generator, pass it - # to the generator again - last_serial = _wrap_headers( - gen_to_serial, gen_serializer.serializer_name, *gen_result - ) - except StopIteration as si: - # when current generator finishes, jump to the previous one - # and pass final result to it - gen_stack.pop() - last_serial = _wrap_headers(call_obj, ser_name, *si.value) - - return last_serial - - -def deserialize(header: Dict, buffers: List, context: Dict = None): - def _deserialize(_header, _buffers): - serializer_name = _header["serializer"] - obj_id = _header["id"] - if serializer_name == "ref": - try: - result = context[obj_id] - except KeyError: - result = context[obj_id] = Placeholder(obj_id) - else: - serializer = _deserializers[serializer_name] - result = serializer.deserialize(_header, _buffers, context) - if not isinstance(result, types.GeneratorType): - _fill_context(obj_id, result) - return result - - def _fill_context(obj_id, result): - context_val, context[obj_id] = context.get(obj_id), result - if type(context_val) is Placeholder: - for cb in context_val.callbacks: - cb(result) - - context = context if context is not None else dict() - - deserialized = _deserialize(header, buffers) - if not isinstance(deserialized, types.GeneratorType): - # result is not a generator, return directly - return deserialized - else: - # result is a generator, iter it till final result - gen_stack = [(deserialized, (header, buffers))] - last_deserial = None - while gen_stack: - gen, to_deserial = gen_stack[-1] - try: - gen_to_deserial = gen.send(last_deserial) - gen_deserialized = _deserialize(*gen_to_deserial) - if isinstance(gen_deserialized, types.GeneratorType): - # when intermediate result still generator, push its contexts - # into stack and handle it first - gen_stack.append((gen_deserialized, gen_to_deserial)) - # result need to be emptied to run the generator - last_deserial = None - else: - # when intermediate result is not generator, pass it - # to the generator again - last_deserial = gen_deserialized - except StopIteration as si: - # when current generator finishes, jump to the previous one - # and pass final result to it - gen_stack.pop() - last_deserial = si.value - # remember to fill Placeholders when some result is generated - _fill_context(to_deserial[0]["id"], last_deserial) - return last_deserial diff --git a/mars/serialization/core.pyi b/mars/serialization/core.pyi new file mode 100644 index 0000000000..44f8c3b58a --- /dev/null +++ b/mars/serialization/core.pyi @@ -0,0 +1,37 @@ +# Copyright 1999-2021 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Callable, Dict, List, Tuple + +def buffered(func: Callable) -> Callable: ... +def short_id(obj: Any) -> int: ... + +class Serializer: + serializer_id: int + def serial(self, obj: Any, context: Dict): ... + def deserial(self, serialized: Tuple, context: Dict, subs: List[Any]): ... + @classmethod + def register(cls, obj_type): ... + @classmethod + def unregister(cls, obj_type): ... + +class Placeholder: + id: int + callbacks: List[Callable] + def __init__(self, id_: int): ... + def __hash__(self): ... + def __eq__(self, other): ... + +def serialize(obj: Any, context: Dict = None): ... +def deserialize(headers: List, buffers: List, context: Dict = None): ... diff --git a/mars/serialization/core.pyx b/mars/serialization/core.pyx new file mode 100644 index 0000000000..ac661ed503 --- /dev/null +++ b/mars/serialization/core.pyx @@ -0,0 +1,585 @@ +# Copyright 1999-2022 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import inspect +import sys +from cpython cimport PyObject +from functools import partial, wraps +from libc.stdint cimport uint32_t, int64_t, uintptr_t +from typing import Any, Dict, List + +import numpy as np +import pandas as pd + +from .._utils cimport TypeDispatcher +from ..utils import tokenize_int + +import cloudpickle + +if sys.version_info[:2] < (3, 8): + try: + import pickle5 as pickle # nosec # pylint: disable=import_pickle + except ImportError: + import pickle # nosec # pylint: disable=import_pickle +else: + import pickle # nosec # pylint: disable=import_pickle + +BUFFER_PICKLE_PROTOCOL = max(pickle.DEFAULT_PROTOCOL, 5) +cdef bint HAS_PICKLE_BUFFER = pickle.HIGHEST_PROTOCOL >= 5 +cdef bint _PANDAS_HAS_MGR = hasattr(pd.Series([0]), "_mgr") + + +cdef TypeDispatcher _serial_dispatcher = TypeDispatcher() +cdef dict _deserializers = dict() + +cdef uint32_t _MAX_STR_PRIMITIVE_LEN = 1024 + + +cdef class Serializer: + serializer_id = None + + cpdef serial(self, obj: Any, dict context): + raise NotImplementedError + + cpdef deserial(self, tuple serialized, dict context, list subs): + raise NotImplementedError + + @classmethod + def calc_default_serializer_id(cls): + return tokenize_int(f"{cls.__module__}.{cls.__qualname__}") & 0x7fffffff + + @classmethod + def register(cls, obj_type): + inst = cls() + if ( + cls.serializer_id is None + or cls.serializer_id == getattr(super(cls, cls), "serializer_id") + ): + # a class should have its own serializer_id without inheritance + cls.serializer_id = cls.calc_default_serializer_id() + _serial_dispatcher.register(obj_type, inst) + _deserializers[cls.serializer_id] = inst + + @classmethod + def unregister(cls, obj_type): + _serial_dispatcher.unregister(obj_type) + _deserializers.pop(cls.serializer_id, None) + + +cdef inline uint32_t _short_id(object obj) nogil: + cdef void* ptr = obj + return (ptr) & 0xffffffff + + +def short_id(obj): + return _short_id(obj) + + +def buffered(func): + @wraps(func) + def wrapped(self, obj: Any, dict context): + cdef uint32_t short_id = _short_id(obj) + if short_id in context: + return Placeholder(_short_id(obj)) + else: + context[short_id] = obj + return func(self, obj, context) + + return wrapped + + +def pickle_buffers(obj): + buffers = [None] + if HAS_PICKLE_BUFFER: + + def buffer_cb(x): + x = x.raw() + if x.ndim > 1: + # ravel n-d memoryview + x = x.cast(x.format) + buffers.append(memoryview(x)) + + buffers[0] = cloudpickle.dumps( + obj, + buffer_callback=buffer_cb, + protocol=BUFFER_PICKLE_PROTOCOL, + ) + else: # pragma: no cover + buffers[0] = cloudpickle.dumps(obj) + return buffers + + +def unpickle_buffers(buffers): + result = cloudpickle.loads(buffers[0], buffers=buffers[1:]) + + # as pandas prior to 1.1.0 use _data instead of _mgr to hold BlockManager, + # deserializing from high versions may produce mal-functioned pandas objects, + # thus the patch is needed + if _PANDAS_HAS_MGR: + return result + else: # pragma: no cover + if hasattr(result, "_mgr") and isinstance(result, (pd.DataFrame, pd.Series)): + result._data = getattr(result, "_mgr") + delattr(result, "_mgr") + return result + + +cdef class PickleSerializer(Serializer): + serializer_id = 0 + + cpdef serial(self, obj: Any, dict context): + cdef uint32_t obj_id + obj_id = _short_id(obj) + if obj_id in context: + return Placeholder(obj_id) + context[obj_id] = obj + + return (), pickle_buffers(obj), True + + cpdef deserial(self, tuple serialized, dict context, list subs): + return unpickle_buffers(subs) + + +cdef set _primitive_types = { + type(None), + bool, + int, + float, + complex, + datetime.datetime, + datetime.date, + datetime.timedelta, + type(max), # builtin functions + np.dtype, + np.number, +} + + +class PrimitiveSerializer(Serializer): + serializer_id = 1 + + @buffered + def serial(self, obj: Any, context: Dict): + return (obj,), [], True + + def deserial(self, tuple obj, context: Dict, subs: List[Any]): + return obj[0] + + +cdef class BytesSerializer(Serializer): + serializer_id = 2 + + cpdef serial(self, obj: Any, dict context): + cdef uint32_t obj_id + obj_id = _short_id(obj) + if obj_id in context: + return Placeholder(obj_id) + context[obj_id] = obj + + return (), [obj], True + + cpdef deserial(self, tuple serialized, dict context, list subs): + return subs[0] + + +cdef class StrSerializer(Serializer): + serializer_id = 3 + + cpdef serial(self, obj: Any, dict context): + cdef uint32_t obj_id + obj_id = _short_id(obj) + if obj_id in context: + return Placeholder(obj_id) + context[obj_id] = obj + + return (), [(obj).encode()], True + + cpdef deserial(self, tuple serialized, dict context, list subs): + buffer = subs[0] + if type(buffer) is memoryview: + buffer = buffer.tobytes() + return buffer.decode() + + +cdef class CollectionSerializer(Serializer): + obj_type = None + + cpdef tuple _serial_iterable(self, obj: Any): + cdef list idx_to_propagate = [] + cdef list obj_to_propagate = [] + cdef list obj_list = list(obj) + cdef int64_t idx + + for idx in range(len(obj_list)): + item = obj_list[idx] + item_type = type(item) + + if ( + (item_type is bytes or item_type is str) + and len(item) < _MAX_STR_PRIMITIVE_LEN + ): + # treat short strings as primitives + continue + elif item_type in _primitive_types: + continue + + obj_list[idx] = None + idx_to_propagate.append(idx) + obj_to_propagate.append(item) + + if self.obj_type is not None and type(obj) is not self.obj_type: + obj_type = type(obj) + else: + obj_type = None + return (obj_list, idx_to_propagate, obj_type), obj_to_propagate, False + + cpdef serial(self, obj: Any, dict context): + cdef uint32_t obj_id + obj_id = _short_id(obj) + if obj_id in context: + return Placeholder(obj_id) + context[obj_id] = obj + + return self._serial_iterable(obj) + + cpdef list _deserial_iterable(self, tuple serialized, list subs): + cdef list res_list, idx_to_propagate + cdef int64_t i + + res_list, idx_to_propagate, _ = serialized + + for i in range(len(idx_to_propagate)): + res_list[idx_to_propagate[i]] = subs[i] + return res_list + + +cdef class TupleSerializer(CollectionSerializer): + serializer_id = 4 + obj_type = tuple + + cpdef deserial(self, tuple serialized, dict context, list subs): + cdef list res = self._deserial_iterable(serialized, subs) + for v in res: + assert type(v) is not Placeholder + + obj_type = serialized[-1] or tuple + if hasattr(obj_type, "_fields"): + # namedtuple + return obj_type(*res) + else: + return obj_type(res) + + +cdef class ListSerializer(CollectionSerializer): + serializer_id = 5 + obj_type = list + + cpdef deserial(self, tuple serialized, dict context, list subs): + cdef int64_t idx + cdef list res = self._deserial_iterable(serialized, subs) + + obj_type = serialized[-1] + if obj_type is None: + result = res + else: + result = obj_type(res) + + for idx, v in enumerate(res): + if type(v) is Placeholder: + (v).callbacks.append( + partial(result.__setitem__, idx) + ) + return result + + +def _dict_key_replacer(ret, key, real_key): + ret[real_key] = ret.pop(key) + + +def _dict_value_replacer(context, ret, key, real_value): + if type(key) is Placeholder: + key = context[(key).id] + ret[key] = real_value + + +cdef class DictSerializer(CollectionSerializer): + serializer_id = 6 + _inspected_inherits = set() + + cpdef serial(self, obj: Any, dict context): + cdef uint32_t obj_id + cdef tuple key_obj, value_obj + cdef list key_bufs, value_bufs + + obj_id = _short_id(obj) + if obj_id in context: + return Placeholder(obj_id) + context[obj_id] = obj + + obj_type = type(obj) + if obj_type is not dict and obj_type not in self._inspected_inherits: + inspect_init = inspect.getfullargspec(obj_type.__init__) + if ( + inspect_init.args == ["self"] + and not inspect_init.varargs + and not inspect_init.varkw + ): + # dict inheritance + # remove context to generate real serialized result + context.pop(obj_id) + return (obj,), [], True + else: + self._inspected_inherits.add(obj_type) + + key_obj, key_bufs, _ = self._serial_iterable(obj.keys()) + value_obj, value_bufs, _ = self._serial_iterable(obj.values()) + if type(obj) is not dict: + obj_type = type(obj) + else: + obj_type = None + ser_obj = (key_obj[:-1], value_obj[:-1], len(key_bufs), obj_type) + return ser_obj, key_bufs + value_bufs, False + + cpdef deserial(self, tuple serialized, dict context, list subs): + cdef int64_t i, num_key_bufs + cdef list key_subs, value_subs, keys, values + + if len(serialized) == 1: + # serialized directly + return serialized[0] + + key_serialized, value_serialized, num_key_bufs, obj_type = serialized + key_subs = subs[:num_key_bufs] + value_subs = subs[num_key_bufs:] + + keys = self._deserial_iterable(key_serialized + (None,), key_subs) + values = self._deserial_iterable(value_serialized + (None,), value_subs) + + if obj_type is None: + ret = dict(zip(keys, values)) + else: + try: + ret = obj_type(zip(keys, values)) + except TypeError: + # defaultdict + ret = obj_type() + ret.update(zip(keys, values)) + + for i in range(len(keys)): + k, v = keys[i], values[i] + if type(k) is Placeholder: + (k).callbacks.append( + partial(_dict_key_replacer, ret, k) + ) + if type(v) is Placeholder: + (v).callbacks.append( + partial(_dict_value_replacer, context, ret, k) + ) + return ret + + +cdef class Placeholder: + cpdef public uint32_t id + cpdef public list callbacks + + def __init__(self, uint32_t id_): + self.id = id_ + self.callbacks = [] + + def __hash__(self): + return hash(self.id) + + def __eq__(self, other): # pragma: no cover + if type(other) is not Placeholder: + return False + return self.id == other.id + + +cdef class PlaceholderSerializer(Serializer): + serializer_id = 7 + + cpdef serial(self, obj: Any, dict context): + return ((obj).id,), [], True + + cpdef deserial(self, tuple serialized, dict context, list subs): + obj_id = serialized[0] + try: + return context[obj_id] + except KeyError: + return Placeholder(obj_id) + + +PickleSerializer.register(object) +for _primitive in _primitive_types: + PrimitiveSerializer.register(_primitive) +BytesSerializer.register(bytes) +StrSerializer.register(str) +ListSerializer.register(list) +TupleSerializer.register(tuple) +DictSerializer.register(dict) +PlaceholderSerializer.register(Placeholder) + + +cdef class _SerialStackItem: + cdef public tuple serialized + cdef public list subs + cdef public list subs_serialized + + def __cinit__(self, tuple serialized, list subs): + self.serialized = serialized + self.subs = subs + self.subs_serialized = [] + + +cdef tuple _serial_single(obj, dict context): + cdef uint32_t obj_id + cdef Serializer serializer + cdef tuple common_header, serialized + + while True: + serializer = _serial_dispatcher.get_handler(type(obj)) + ret_serial = serializer.serial(obj, context) + if type(ret_serial) is tuple: + serialized, subs, final = ret_serial + + if type(obj) is Placeholder: + obj_id = (obj).id + else: + obj_id = _short_id(obj) + + common_header = ( + serializer.serializer_id, obj_id, len(subs), final + ) + break + else: + obj = ret_serial + return common_header + serialized, subs, final + + +def serialize(obj, dict context = None): + cdef list serial_stack = [] + cdef _SerialStackItem stack_item + cdef list result_bufs_list = [] + cdef tuple serialized + cdef list subs + cdef bint final + cdef int64_t num_serialized + + context = context if context is not None else dict() + serialized, subs, final = _serial_single(obj, context) + if final or not subs: + return ({}, serialized), subs + + serial_stack.append(_SerialStackItem(serialized, subs)) + serialized = None + + while serial_stack: + stack_item = serial_stack[-1] + if serialized is not None: + stack_item.subs_serialized.append(serialized) + num_serialized = len(stack_item.subs_serialized) + if len(stack_item.subs) == num_serialized: + serialized = stack_item.serialized + tuple(stack_item.subs_serialized) + serial_stack.pop() + else: + serialized, subs, final = _serial_single( + stack_item.subs[num_serialized], context + ) + if final or not subs: + if subs: + result_bufs_list.extend(subs) + else: + stack_item = _SerialStackItem(serialized, subs) + serial_stack.append(stack_item) + serialized = None + return ({}, serialized), result_bufs_list + + +cdef class _DeserialStackItem: + cdef public tuple serialized + cdef public tuple subs + cdef public list subs_deserialized + + def __cinit__(self, tuple serialized, tuple subs): + self.serialized = serialized + self.subs = subs + self.subs_deserialized = [] + + +cdef void _fill_placeholders(dict context, obj_id, result): + context_val, context[obj_id] = context.get(obj_id), result + if type(context_val) is Placeholder: + for cb in (context_val).callbacks: + cb(result) + + +cdef _deserial_single(tuple serialized, dict context, list subs): + cdef Serializer serializer + cdef int64_t num_subs + + serializer_id, obj_id, num_subs, final = serialized[:4] + serializer = _deserializers[serializer_id] + res = serializer.deserial(serialized[4:], context, subs) + + _fill_placeholders(context, obj_id, res) + return res + + +def deserialize(tuple serialized, list buffers, dict context = None): + cdef list deserial_stack = [] + cdef _DeserialStackItem stack_item + cdef int64_t num_subs, num_deserialized, buf_pos = 0 + cdef bint final + cdef Serializer serializer + cdef object deserialized = None + + context = context if context is not None else dict() + # drop extra prop field + serialized = serialized[-1] + serializer_id, obj_id, num_subs, final = serialized[:4] + if final or num_subs == 0: + return _deserial_single(serialized, context, buffers) + + deserial_stack.append( + _DeserialStackItem( + serialized[:-num_subs], serialized[-num_subs:] + ) + ) + + while deserial_stack: + stack_item = deserial_stack[-1] + if deserialized is not None: + stack_item.subs_deserialized.append(deserialized) + num_deserialized = len(stack_item.subs_deserialized) + if len(stack_item.subs) == num_deserialized: + deserialized = _deserial_single( + stack_item.serialized, context, stack_item.subs_deserialized + ) + deserial_stack.pop() + else: + serialized = stack_item.subs[num_deserialized] + serializer_id, obj_id, num_subs, final = serialized[:4] + if final or num_subs == 0: + deserialized = _deserial_single( + serialized, context, buffers[buf_pos : buf_pos + num_subs] + ) + buf_pos += num_subs + else: + stack_item = _DeserialStackItem( + serialized[:-num_subs], serialized[-num_subs:] + ) + deserial_stack.append(stack_item) + deserialized = None + return deserialized diff --git a/mars/serialization/cuda.py b/mars/serialization/cuda.py index fbb8e59a7a..a080e674fd 100644 --- a/mars/serialization/cuda.py +++ b/mars/serialization/cuda.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, List, Dict +from typing import Any, List, Dict, Tuple import pandas as pd @@ -24,10 +24,8 @@ class CupySerializer(Serializer): - serializer_name = "cupy" - @buffered - def serialize(self, obj: Any, context: Dict): + def serial(self, obj: Any, context: Dict): if not (obj.flags["C_CONTIGUOUS"] or obj.flags["F_CONTIGUOUS"]): obj = cupy.array(obj, copy=True) @@ -37,20 +35,19 @@ def serialize(self, obj: Any, context: Dict): buffer = cupy.ndarray( shape=(obj.nbytes,), dtype=cupy.dtype("u1"), memptr=obj.data, strides=(1,) ) - return header, [buffer] + return (header,), [buffer], True - def deserialize(self, header: Dict, buffers: List, context: Dict): + def deserial(self, serialized: Tuple, context: Dict, subs: List): + (header,) = serialized return cupy.ndarray( shape=header["shape"], dtype=header["typestr"], - memptr=cupy.asarray(buffers[0]).data, + memptr=cupy.asarray(subs[0]).data, strides=header["strides"], ) class CudfSerializer(Serializer): - serializer_name = "cudf" - @staticmethod def _get_ext_index_type(index_obj): import cudf @@ -81,17 +78,18 @@ def _apply_index_type(obj, attr, header): new_index = multi_index_cls.from_tuples(original_index, names=header["names"]) setattr(obj, attr, new_index) - def serialize(self, obj: Any, context: Dict): + def serial(self, obj: Any, context: Dict): header, buffers = obj.device_serialize() if hasattr(obj, "columns"): header["_ext_columns"] = self._get_ext_index_type(obj.columns) if hasattr(obj, "index"): header["_ext_index"] = self._get_ext_index_type(obj.index) - return header, buffers + return (header,), buffers, True - def deserialize(self, header: Dict, buffers: List, context: Dict): + def deserial(self, serialized: Tuple, context: Dict, buffers: List): from cudf.core.abc import Serializable + (header,) = serialized col_header = header.pop("_ext_columns", None) index_header = header.pop("_ext_index", None) diff --git a/mars/serialization/exception.py b/mars/serialization/exception.py index b713d3a344..2ea5ebf4d4 100644 --- a/mars/serialization/exception.py +++ b/mars/serialization/exception.py @@ -31,18 +31,16 @@ def __init__(self, raw_error: Union[str, Exception]): class ExceptionSerializer(Serializer): - serializer_name = "pickle" - @buffered - def serialize(self, obj: Exception, context: Dict): + def serial(self, obj: Exception, context: Dict): try: buffers = pickle_buffers(obj) except (TypeError, pickle.PicklingError): buffers = pickle_buffers(UnpickleableError(obj)) - return {}, buffers + return (), buffers, True - def deserialize(self, header: Dict, buffers: List, context: Dict): - return unpickle_buffers(buffers) + def deserial(self, serialized: Dict, context: Dict, subs: List): + return unpickle_buffers(subs) ExceptionSerializer.register(Exception) diff --git a/mars/serialization/mars_objects.py b/mars/serialization/mars_objects.py index 74ef8c4998..3f80bf9bb4 100644 --- a/mars/serialization/mars_objects.py +++ b/mars/serialization/mars_objects.py @@ -24,20 +24,15 @@ class SparseNDArraySerializer(Serializer): - serializer_name = "mars.SparseNDArray" - @buffered - def serialize(self, obj: Any, context: Dict): + def serial(self, obj: Any, context: Dict): raw_header, raw_buffers = serialize(obj.raw, context) - header = { - "raw_header": raw_header, - "shape": list(obj.shape), - } - return header, raw_buffers - - def deserialize(self, header: Dict, buffers: List, context: Dict): - raw_csr = deserialize(header["raw_header"], buffers) - return SparseNDArray(raw_csr, shape=tuple(header["shape"])) + return (raw_header, obj.shape), raw_buffers, True + + def deserial(self, serialized: Dict, context: Dict, subs: List): + raw_header, obj_shape = serialized + raw_csr = deserialize(raw_header, subs) + return SparseNDArray(raw_csr, shape=tuple(obj_shape)) if sps: # pragma: no branch diff --git a/mars/serialization/numpy.py b/mars/serialization/numpy.py index dd4fe3c6af..1b969bdb4d 100644 --- a/mars/serialization/numpy.py +++ b/mars/serialization/numpy.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, List +from typing import Any, Dict, List, Tuple import numpy as np @@ -20,15 +20,13 @@ class NDArraySerializer(Serializer): - serializer_name = "np_ndarray" - @buffered - def serialize(self, obj: np.ndarray, context: Dict): + def serial(self, obj: np.ndarray, context: Dict): header = {} if obj.dtype.hasobject: header["pickle"] = True buffers = pickle_buffers(obj) - return header, buffers + return (header,), buffers, True order = "C" if obj.flags.f_contiguous: @@ -55,11 +53,12 @@ def serialize(self, obj: np.ndarray, context: Dict): order=order, ) ) - return header, [memoryview(obj.ravel(order=order).view("uint8").data)] + return (header,), [memoryview(obj.ravel(order=order).view("uint8").data)], True - def deserialize(self, header: Dict, buffers: List, context: Dict): + def deserial(self, serialized: Tuple, context: Dict, subs: List[Any]): + header = serialized[0] if header["pickle"]: - return unpickle_buffers(buffers) + return unpickle_buffers(subs) try: dtype = np.lib.format.descr_to_dtype(header["descr"]) @@ -73,7 +72,7 @@ def deserialize(self, header: Dict, buffers: List, context: Dict): return np.ndarray( shape=tuple(header["shape"]), dtype=dtype, - buffer=buffers[0], + buffer=subs[0], strides=tuple(header["strides"]), order=header["order"], ) diff --git a/mars/serialization/ray.py b/mars/serialization/ray.py index dc5a157ac4..424faad956 100644 --- a/mars/serialization/ray.py +++ b/mars/serialization/ray.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Dict, Any +from typing import Any, Dict, List, Tuple from ..utils import lazy_import from .core import Serializer, buffered, PickleSerializer @@ -24,17 +24,13 @@ class RaySerializer(Serializer): """Return raw object to let ray do serialization.""" - serializer_name = "ray" - @buffered - def serialize(self, obj: Any, context: Dict): - header = {"o": obj} - buffers = [] - return header, buffers - - def deserialize(self, header: Dict, buffers: List, context: Dict): - assert not buffers - return header["o"] + def serial(self, obj: Any, context: Dict): + return (obj,), [], True + + def deserial(self, serialized: Tuple, context: Dict, subs: List[Any]): + assert not subs + return serialized[0] def register_ray_serializers(): diff --git a/mars/serialization/scipy.py b/mars/serialization/scipy.py index 1649df572f..d3cdf9ab60 100644 --- a/mars/serialization/scipy.py +++ b/mars/serialization/scipy.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Dict, List +from typing import Any, Dict, List, Tuple import numpy as np @@ -25,34 +25,38 @@ class CsrMatrixSerializer(Serializer): - serializer_name = "sps.csr_matrix" - @buffered - def serialize(self, obj: Any, context: Dict): + def serial(self, obj: Any, context: Dict): data_header, data_buffers = serialize(obj.data) idx_header, idx_buffers = serialize(obj.indices) indptr_header, indptr_buffers = serialize(obj.indptr) - header = { - "data_header": data_header, - "data_buf_num": len(data_buffers), - "idx_header": idx_header, - "idx_buf_num": len(idx_buffers), - "indptr_header": indptr_header, - "shape": list(obj.shape), - } - return header, data_buffers + idx_buffers + indptr_buffers + header = ( + data_header, # data_header + len(data_buffers), # data_buf_num + idx_header, # idx_header + len(idx_buffers), # idx_buf_num + indptr_header, # indptr_header + obj.shape, # shape + ) + return header, data_buffers + idx_buffers + indptr_buffers, True - def deserialize(self, header: Dict, buffers: List, context: Dict): - data_buf_num = header["data_buf_num"] - idx_buf_num = header["idx_buf_num"] - data_buffers = buffers[:data_buf_num] - idx_buffers = buffers[data_buf_num : data_buf_num + idx_buf_num] - indptr_buffers = buffers[data_buf_num + idx_buf_num :] + def deserial(self, serialized: Tuple, context: Dict, subs: List): + ( + data_header, + data_buf_num, + idx_header, + idx_buf_num, + indptr_header, + shape, + ) = serialized + data_buffers = subs[:data_buf_num] + idx_buffers = subs[data_buf_num : data_buf_num + idx_buf_num] + indptr_buffers = subs[data_buf_num + idx_buf_num :] - data = deserialize(header["data_header"], data_buffers) - indices = deserialize(header["idx_header"], idx_buffers) - indptr = deserialize(header["indptr_header"], indptr_buffers) - shape = tuple(header["shape"]) + data = deserialize(data_header, data_buffers) + indices = deserialize(idx_header, idx_buffers) + indptr = deserialize(indptr_header, indptr_buffers) + shape = tuple(shape) empty_arr = np.zeros(0, dtype=data.dtype) diff --git a/mars/serialization/serializables/core.py b/mars/serialization/serializables/core.py index 852a8d660f..6763dc0814 100644 --- a/mars/serialization/serializables/core.py +++ b/mars/serialization/serializables/core.py @@ -12,10 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import weakref from collections import OrderedDict from functools import partial -from typing import Any, Dict, Generator, List, Type, Tuple +from typing import Any, Dict, List, Type, Tuple +import cloudpickle + +from ...utils import no_default from ..core import Serializer, Placeholder, buffered from .field import Field, OneOfField from .field_type import ( @@ -30,7 +34,7 @@ ) -_basic_field_type = ( +_primitive_field_types = ( PrimitiveFieldType, DtypeType, DatetimeType, @@ -39,12 +43,12 @@ ) -def serialize_by_pickle(field: Field): +def _is_field_primitive_compound(field: Field): if field.on_serialize is not None or field.on_deserialize is not None: return False def check_type(field_type): - if isinstance(field_type, _basic_field_type): + if isinstance(field_type, _primitive_field_types): return True if isinstance(field_type, (ListType, TupleType)): if all( @@ -54,7 +58,8 @@ def check_type(field_type): return True if isinstance(field_type, DictType): if all( - isinstance(element_type, _basic_field_type) or element_type is Ellipsis + isinstance(element_type, _primitive_field_types) + or element_type is Ellipsis for element_type in (field_type.key_type, field_type.value_type) ): return True @@ -86,14 +91,14 @@ def __new__(mcs, name: str, bases: Tuple[Type], properties: Dict): property_to_fields[k] = v v._attr_name = k - if serialize_by_pickle(v): + if _is_field_primitive_compound(v): pickle_fields.append(v) else: non_pickle_fields.append(v) properties["_FIELDS"] = property_to_fields - properties["_PICKLE_FIELDS"] = pickle_fields - properties["_NON_PICKLE_FIELDS"] = non_pickle_fields + properties["_PRIMITIVE_FIELDS"] = pickle_fields + properties["_NON_PRIMITIVE_FIELDS"] = non_pickle_fields slots = set(properties.pop("__slots__", set())) if property_to_fields: slots.add("_FIELD_VALUES") @@ -104,10 +109,14 @@ def __new__(mcs, name: str, bases: Tuple[Type], properties: Dict): class Serializable(metaclass=SerializableMeta): - __slots__ = () + __slots__ = ("__weakref__",) + + _cache_primitive_serial = False _FIELDS: Dict[str, Field] _FIELD_VALUES: Dict[str, Any] + _PRIMITIVE_FIELDS: List[str] + _NON_PRIMITIVE_FIELDS: List[str] def __init__(self, *args, **kwargs): if args: # pragma: no cover @@ -132,8 +141,7 @@ def copy(self) -> "Serializable": return copied -class _SkipStub: - pass +_primitive_serial_cache = weakref.WeakKeyDictionary() class SerializableSerializer(Serializer): @@ -141,8 +149,6 @@ class SerializableSerializer(Serializer): Leverage DictSerializer to perform serde. """ - serializer_name = "serializable" - @classmethod def _get_field_values(cls, obj: Serializable, fields): attr_to_values = obj._FIELD_VALUES @@ -154,34 +160,27 @@ def _get_field_values(cls, obj: Serializable, fields): if field.on_serialize: value = field.on_serialize(value) except KeyError: - value = _SkipStub # Most field values are not None, serialize by list is more efficient than dict. + # Most field values are not None, serialize by list is more efficient than dict. + value = no_default values.append(value) return values @buffered - def serialize(self, obj: Serializable, context: Dict): - pickles = self._get_field_values(obj, obj._PICKLE_FIELDS) - composed_values = self._get_field_values(obj, obj._NON_PICKLE_FIELDS) - - value_headers = [None] * len(composed_values) - value_sizes = [0] * len(composed_values) - value_buffers = [] - for idx, val in enumerate(composed_values): - value_headers[idx], val_buf = yield val - value_sizes[idx] = len(val_buf) - value_buffers.extend(val_buf) - - header = {"class": type(obj)} - if pickles: - header["pickles"] = pickles - if composed_values: - header["value_headers"] = value_headers - header["value_sizes"] = value_sizes - return header, value_buffers + def serial(self, obj: Serializable, context: Dict): + if obj._cache_primitive_serial and obj in _primitive_serial_cache: + primitive_vals = _primitive_serial_cache[obj] + else: + primitive_vals = self._get_field_values(obj, obj._PRIMITIVE_FIELDS) + if obj._cache_primitive_serial: + primitive_vals = cloudpickle.dumps(primitive_vals) + _primitive_serial_cache[obj] = primitive_vals + + compound_vals = self._get_field_values(obj, obj._NON_PRIMITIVE_FIELDS) + return (type(obj), primitive_vals), [compound_vals], False @classmethod def _set_field_value(cls, attr_to_values: dict, field: Field, value): - if value is _SkipStub: + if value is no_default: return attr_to_values[field.attr_name] = value if type(field) is not OneOfField: @@ -201,28 +200,21 @@ def cb(v, field_): else: cb(value, field) - def deserialize( - self, header: Dict, buffers: List, context: Dict - ) -> Generator[Any, Any, Serializable]: - obj_class: Type[Serializable] = header.pop("class") + def deserial(self, serialized: Tuple, context: Dict, subs: List) -> Serializable: + obj_class, primitives = serialized attr_to_values = dict() - pickles = header.get("pickles") - if pickles: - for value, field in zip(pickles, obj_class._PICKLE_FIELDS): + + if type(primitives) is not list: + primitives = cloudpickle.loads(primitives) + + if primitives: + for field, value in zip(obj_class._PRIMITIVE_FIELDS, primitives): self._set_field_value(attr_to_values, field, value) - if obj_class._NON_PICKLE_FIELDS: - pos = 0 - value_headers = header.get("value_headers") - value_sizes = header.get("value_sizes") - for field, value_header, value_size in zip( - obj_class._NON_PICKLE_FIELDS, value_headers, value_sizes - ): - value = ( - yield value_header, - buffers[pos : pos + value_size], - ) # noqa: E999 - pos += value_size + + if obj_class._NON_PRIMITIVE_FIELDS: + for field, value in zip(obj_class._NON_PRIMITIVE_FIELDS, subs[0]): self._set_field_value(attr_to_values, field, value) + obj = obj_class() obj._FIELD_VALUES = attr_to_values return obj diff --git a/mars/utils.py b/mars/utils.py index 9370608094..3cd1b14d50 100644 --- a/mars/utils.py +++ b/mars/utils.py @@ -355,7 +355,8 @@ def serialize_serializable(serializable, compress: bool = False): bio = io.BytesIO() header, buffers = serialize(serializable) - header["buf_sizes"] = [getattr(buf, "nbytes", len(buf)) for buf in buffers] + buf_sizes = [getattr(buf, "nbytes", len(buf)) for buf in buffers] + header[0]["buf_sizes"] = buf_sizes s_header = pickle.dumps(header) bio.write(struct.pack(" Date: Sun, 17 Apr 2022 12:45:06 +0800 Subject: [PATCH 2/3] Add comments and fetch-shuffle benchmark --- .github/workflows/core-ci.yml | 4 +- asv_bench/benchmarks/serialize.py | 47 +++++++ azure-pipelines.yml | 2 +- ci/copycheck.py | 34 ++--- ci/importcheck.py | 44 ++++--- docs/source/norm_zh.py | 25 ++-- mars/serialization/core.pyx | 158 +++++++++++++++++++++--- mars/serialization/tests/test_serial.py | 24 +++- 8 files changed, 270 insertions(+), 68 deletions(-) diff --git a/.github/workflows/core-ci.yml b/.github/workflows/core-ci.yml index 0545e402cb..a407a7cd9f 100644 --- a/.github/workflows/core-ci.yml +++ b/.github/workflows/core-ci.yml @@ -18,9 +18,9 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest] - python-version: [3.7, 3.8, 3.9, 3.8-cython] + python-version: [3.7, 3.8, 3.9, 3.9-cython] include: - - { os: ubuntu-latest, python-version: 3.8-cython, no-common-tests: 1, + - { os: ubuntu-latest, python-version: 3.9-cython, no-common-tests: 1, no-deploy: 1, with-cython: 1 } steps: diff --git a/asv_bench/benchmarks/serialize.py b/asv_bench/benchmarks/serialize.py index 0e09a49da0..038c5e1186 100644 --- a/asv_bench/benchmarks/serialize.py +++ b/asv_bench/benchmarks/serialize.py @@ -16,6 +16,8 @@ import numpy as np import pandas as pd +from mars.core.operand import MapReduceOperand +from mars.dataframe.operands import DataFrameOperandMixin from mars.serialization import serialize, deserialize from mars.serialization.serializables import ( Serializable, @@ -38,6 +40,7 @@ ) from mars.services.subtask import Subtask from mars.services.task import new_task_id +from mars.utils import tokenize class SerializableChild(Serializable): @@ -168,3 +171,47 @@ def time_pickle_serialize_deserialize_tuple(self): def time_pickle_serialize_deserialize_dict(self): deserialize(*cloudpickle.loads(cloudpickle.dumps(serialize(self.test_dict)))) + + +class MockDFOperand(MapReduceOperand, DataFrameOperandMixin): + _op_type_ = 14320 + + +class SerializeFetchShuffleSuite: + def setup(self): + from mars.core import OutputType + from mars.core.operand import OperandStage + from mars.dataframe.operands import DataFrameShuffleProxy + from mars.utils import build_fetch + + source_chunks = [] + for i in range(1000): + op = MockDFOperand( + _output_types=[OutputType.dataframe], + _key=tokenize(i), + stage=OperandStage.map, + ) + source_chunks.append(op.new_chunk([], index=(i,))) + + shuffle_chunk = DataFrameShuffleProxy( + output_types=[OutputType.dataframe] + ).new_chunk(source_chunks) + + fetch_chunk = build_fetch(shuffle_chunk) + + self.test_fetch_chunks = [] + for i in range(1000): + reduce_op = MockDFOperand( + _output_types=[OutputType.dataframe], + _key=tokenize((i, 1)), + stage=OperandStage.reduce, + ) + self.test_fetch_chunks.append( + reduce_op.new_chunk([fetch_chunk], index=(i,)) + ) + + def time_pickle_serialize_fetch_shuffle_chunks(self): + for fetch_chunk in self.test_fetch_chunks: + header, buffers = serialize(fetch_chunk) + serialized = cloudpickle.dumps((header, buffers)) + deserialize(*cloudpickle.loads(serialized)) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 20d29addbf..b4f7e05326 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -158,7 +158,7 @@ jobs: set -e source ./ci/reload-env.sh - black --check --diff --verbose mars + black --check --diff --verbose mars asv_bench displayName: 'Check code style with black' - bash: | diff --git a/ci/copycheck.py b/ci/copycheck.py index c91c64b970..916bf0681a 100755 --- a/ci/copycheck.py +++ b/ci/copycheck.py @@ -18,20 +18,20 @@ from pathlib import PurePath _MATCH_FILES = [ - '*.py', - '*.pyx', + "*.py", + "*.pyx", ] _IGNORES = [ - 'mars/learn/**/*.pyx', - 'mars/lib/**/*.py', - 'mars/_version.py', + "mars/learn/**/*.pyx", + "mars/lib/**/*.py", + "mars/_version.py", ] def main(): root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) miss_files = [] - for root, _dirs, files in os.walk(os.path.join(root_path, 'mars')): + for root, _dirs, files in os.walk(os.path.join(root_path, "mars")): for fn in files: rel_path = os.path.relpath(os.path.join(root, fn), root_path) if any(PurePath(rel_path).match(patt) for patt in _IGNORES): @@ -40,18 +40,22 @@ def main(): continue file_path = os.path.join(root, fn) - with open(file_path, 'rb') as input_file: - file_lines = [line for line in input_file.read().split(b'\n') - if line.startswith(b'#')] - comments = b'\n'.join(file_lines) - if b'Copyright' not in comments: + with open(file_path, "rb") as input_file: + file_lines = [ + line + for line in input_file.read().split(b"\n") + if line.startswith(b"#") + ] + comments = b"\n".join(file_lines) + if b"Copyright" not in comments: miss_files.append(rel_path) if miss_files: - file_list = '\n '.join(miss_files) - sys.stderr.write(f'Please add missing copyright header for files:\n' - f' {file_list}\n') + file_list = "\n ".join(miss_files) + sys.stderr.write( + f"Please add missing copyright header for files:\n" f" {file_list}\n" + ) sys.exit(1) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/ci/importcheck.py b/ci/importcheck.py index a05297f8a8..eb6f3303e7 100755 --- a/ci/importcheck.py +++ b/ci/importcheck.py @@ -21,8 +21,8 @@ from typing import List, NamedTuple, Optional, Tuple _IGNORES = [ - 'mars/lib/**/*.py', - 'conftest.py', + "mars/lib/**/*.py", + "conftest.py", ] @@ -35,27 +35,30 @@ class CheckResult(NamedTuple): @property def has_faults(self) -> bool: - return bool(self.absolute_imports) or bool(self.head_disorder) \ + return ( + bool(self.absolute_imports) + or bool(self.head_disorder) or bool(self.block_disorders) + ) def _check_absolute_import(node: ast.AST) -> List[Tuple[int, int]]: res = set() if isinstance(node, ast.Import): for import_name in node.names: - if import_name.name.startswith('mars.'): + if import_name.name.startswith("mars."): res.add((node.lineno, node.end_lineno)) elif isinstance(node, ast.ImportFrom): - if node.level == 0 and node.module.startswith('mars.'): + if node.level == 0 and node.module.startswith("mars."): res.add((node.lineno, node.end_lineno)) - elif getattr(node, 'body', []): + elif getattr(node, "body", []): for body_item in node.body: res.update(_check_absolute_import(body_item)) return sorted(res) def check_imports(file_path) -> CheckResult: - with open(file_path, 'rb') as src_file: + with open(file_path, "rb") as src_file: body = src_file.read() lines = body.splitlines() parsed = ast.parse(body, filename=file_path) @@ -66,38 +69,43 @@ def check_imports(file_path) -> CheckResult: def _extract_line_block(lines: List, lineno: int, end_lineno: int, indent: str): - grab_lines = '\n'.join(line.decode() for line in lines[lineno - 1:end_lineno]) + grab_lines = "\n".join(line.decode() for line in lines[lineno - 1 : end_lineno]) return textwrap.indent(textwrap.dedent(grab_lines), indent) def format_results(results: List[CheckResult], root_path): rel_import_count = sum(len(res.absolute_imports) for res in results) if rel_import_count > 0: - print(f'Do not use absolute imports for mars module in ' - f'code ({rel_import_count}):', file=sys.stderr) + print( + f"Do not use absolute imports for mars module in " + f"code ({rel_import_count}):", + file=sys.stderr, + ) for res in results: if not res.absolute_imports: continue rel_path = os.path.relpath(res.path, root_path) - print(' ' + rel_path, file=sys.stderr) + print(" " + rel_path, file=sys.stderr) for lineno, end_lineno in res.absolute_imports: - print(f' Line {lineno}-{end_lineno}', file=sys.stderr) - print(_extract_line_block(res.lines, lineno, end_lineno, ' '), - file=sys.stderr) + print(f" Line {lineno}-{end_lineno}", file=sys.stderr) + print( + _extract_line_block(res.lines, lineno, end_lineno, " "), + file=sys.stderr, + ) def main(): root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) results = [] - for root, _dirs, files in os.walk(os.path.join(root_path, 'mars')): + for root, _dirs, files in os.walk(os.path.join(root_path, "mars")): for fn in files: - if '/tests' in root and not fn.startswith('test_'): + if "/tests" in root and not fn.startswith("test_"): # allow test auxiliary files to use full imports continue abs_path = os.path.join(root, fn) rel_path = os.path.relpath(abs_path, root_path) - if not fn.endswith('.py'): + if not fn.endswith(".py"): continue if any(PurePath(rel_path).match(patt) for patt in _IGNORES): continue @@ -111,5 +119,5 @@ def main(): sys.exit(1) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/docs/source/norm_zh.py b/docs/source/norm_zh.py index 989e20ce22..dd1b3aeb51 100755 --- a/docs/source/norm_zh.py +++ b/docs/source/norm_zh.py @@ -31,7 +31,7 @@ def _zh_len(s): Calculate text length in Chinese """ try: - return len(s.encode('gb2312')) + return len(s.encode("gb2312")) except ValueError: return len(s) @@ -41,8 +41,9 @@ def _zh_split(s): Split text length in Chinese """ import jieba + try: - s.encode('ascii') + s.encode("ascii") has_zh = False except ValueError: has_zh = True @@ -54,7 +55,7 @@ def _zh_split(s): # code modified from babel.messages.pofile (hash 359ecffca479dfe032d0f7210d5cd8160599c816) -def _normalize(string, prefix='', width=76): +def _normalize(string, prefix="", width=76): r"""Convert a string into a format that is appropriate for .po files. >>> print(normalize('''Say: ... "hello, world!" @@ -97,7 +98,7 @@ def _normalize(string, prefix='', width=76): # separate line buf.append(chunks.pop()) break - lines.append(u''.join(buf)) + lines.append("".join(buf)) else: lines.append(line) else: @@ -109,8 +110,8 @@ def _normalize(string, prefix='', width=76): # Remove empty trailing line if lines and not lines[-1]: del lines[-1] - lines[-1] += '\n' - return u'""\n' + u'\n'.join([(prefix + escape(line)) for line in lines]) + lines[-1] += "\n" + return '""\n' + "\n".join([(prefix + escape(line)) for line in lines]) def main(): @@ -120,11 +121,11 @@ def main(): return pofile.normalize = _normalize - for root, dirs, files in os.walk('.'): - if 'zh' not in root: + for root, dirs, files in os.walk("."): + if "zh" not in root: continue for f in files: - if not f.endswith('.po'): + if not f.endswith(".po"): continue path = os.path.join(root, f) @@ -133,11 +134,11 @@ def main(): if (datetime.datetime.now() - modify_time).total_seconds() > 120: continue - with open(path, 'rb') as inpf: + with open(path, "rb") as inpf: catalog = pofile.read_po(inpf) - with open(path, 'wb') as outf: + with open(path, "wb") as outf: pofile.write_po(outf, catalog) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/mars/serialization/core.pyx b/mars/serialization/core.pyx index ac661ed503..2384487382 100644 --- a/mars/serialization/core.pyx +++ b/mars/serialization/core.pyx @@ -28,7 +28,7 @@ from ..utils import tokenize_int import cloudpickle -if sys.version_info[:2] < (3, 8): +if sys.version_info[:2] < (3, 8): # pragma: no cover try: import pickle5 as pickle # nosec # pylint: disable=import_pickle except ImportError: @@ -51,9 +51,55 @@ cdef class Serializer: serializer_id = None cpdef serial(self, obj: Any, dict context): + """ + Returns intermediate serialization result of certain object. + The returned value can be a Placeholder or a tuple comprising + of three parts: a header, a group of subcomponents and + a finalizing flag. + + * Header is a pickle-serializable tuple + * Subcomponents are parts or buffers for iterative + serialization. + * Flag is a boolean value. If true, subcomponents should be + buffers (for instance, bytes, memory views, GPU buffers, + etc.) that can be read and written directly. If false, + subcomponents will be serialized iteratively. + + Parameters + ---------- + obj: Any + Object to serialize + context: Dict + Serialization context to help creating Placeholder objects + for reducing duplicated serialization + + Returns + ------- + result: Placeholder | Tuple[Tuple, List, bool] + Intermediate result of serialization + """ raise NotImplementedError cpdef deserial(self, tuple serialized, dict context, list subs): + """ + Returns deserialized object given serialized headers and + deserialized subcomponents. + + Parameters + ---------- + serialized: Tuple + Serialized object header as a tuple + context + Serialization context for instantiation of Placeholder + objects + subs: List + Deserialized subcomponents + + Returns + ------- + result: Any + Deserialized objects + """ raise NotImplementedError @classmethod @@ -65,9 +111,10 @@ cdef class Serializer: inst = cls() if ( cls.serializer_id is None - or cls.serializer_id == getattr(super(cls, cls), "serializer_id") + or cls.serializer_id == getattr(super(cls, cls), "serializer_id", None) ): - # a class should have its own serializer_id without inheritance + # a class should have its own serializer_id + # inherited serializer_id not acceptable cls.serializer_id = cls.calc_default_serializer_id() _serial_dispatcher.register(obj_type, inst) _deserializers[cls.serializer_id] = inst @@ -84,10 +131,14 @@ cdef inline uint32_t _short_id(object obj) nogil: def short_id(obj): + """Short representation of id() used for serialization""" return _short_id(obj) def buffered(func): + """ + Wrapper for serial() method to reduce duplicated serialization + """ @wraps(func) def wrapped(self, obj: Any, dict context): cdef uint32_t short_id = _short_id(obj) @@ -298,9 +349,8 @@ cdef class ListSerializer(CollectionSerializer): for idx, v in enumerate(res): if type(v) is Placeholder: - (v).callbacks.append( - partial(result.__setitem__, idx) - ) + cb = partial(result.__setitem__, idx) + (v).callbacks.append(cb) return result @@ -336,7 +386,8 @@ cdef class DictSerializer(CollectionSerializer): and not inspect_init.varargs and not inspect_init.varkw ): - # dict inheritance + # inherited dicts may not have proper initializers + # for deserialization # remove context to generate real serialized result context.pop(obj_id) return (obj,), [], True @@ -373,7 +424,7 @@ cdef class DictSerializer(CollectionSerializer): try: ret = obj_type(zip(keys, values)) except TypeError: - # defaultdict + # first arg of defaultdict is a callable ret = obj_type() ret.update(zip(keys, values)) @@ -391,6 +442,12 @@ cdef class DictSerializer(CollectionSerializer): cdef class Placeholder: + """ + Placeholder object to reduce duplicated serialization + + The object records object identifier and keeps callbacks + to replace itself in parent objects. + """ cpdef public uint32_t id cpdef public list callbacks @@ -399,13 +456,19 @@ cdef class Placeholder: self.callbacks = [] def __hash__(self): - return hash(self.id) + return self.id def __eq__(self, other): # pragma: no cover if type(other) is not Placeholder: return False return self.id == other.id + def __repr__(self): + return ( + f"Placeholder(id={self.id}, " + f"callbacks=[list of {len(self.callbacks)}])" + ) + cdef class PlaceholderSerializer(Serializer): serializer_id = 7 @@ -444,6 +507,7 @@ cdef class _SerialStackItem: cdef tuple _serial_single(obj, dict context): + """Serialize single object and return serialized tuples""" cdef uint32_t obj_id cdef Serializer serializer cdef tuple common_header, serialized @@ -452,6 +516,7 @@ cdef tuple _serial_single(obj, dict context): serializer = _serial_dispatcher.get_handler(type(obj)) ret_serial = serializer.serial(obj, context) if type(ret_serial) is tuple: + # object is serialized, form a common header and return serialized, subs, final = ret_serial if type(obj) is Placeholder: @@ -464,11 +529,29 @@ cdef tuple _serial_single(obj, dict context): ) break else: + # object is converted into another (usually a Placeholder) obj = ret_serial return common_header + serialized, subs, final def serialize(obj, dict context = None): + """ + Serialize an object and return a header and buffers. + Buffers are intended for zero-copy data manipulation. + + Parameters + ---------- + obj: Any + Object to serialize + context: + Serialization context for instantiation of Placeholder + objects + + Returns + ------- + result: Tuple[Tuple, List] + Picklable header and buffers + """ cdef list serial_stack = [] cdef _SerialStackItem stack_item cdef list result_bufs_list = [] @@ -480,6 +563,7 @@ def serialize(obj, dict context = None): context = context if context is not None else dict() serialized, subs, final = _serial_single(obj, context) if final or not subs: + # marked as a leaf node, return directly return ({}, serialized), subs serial_stack.append(_SerialStackItem(serialized, subs)) @@ -488,22 +572,33 @@ def serialize(obj, dict context = None): while serial_stack: stack_item = serial_stack[-1] if serialized is not None: + # have previously-serialized results, record first stack_item.subs_serialized.append(serialized) + num_serialized = len(stack_item.subs_serialized) if len(stack_item.subs) == num_serialized: + # all subcomponents serialized, serialization of current is done + # and we can move to the parent object serialized = stack_item.serialized + tuple(stack_item.subs_serialized) serial_stack.pop() else: + # serialize next subcomponent at stack top serialized, subs, final = _serial_single( stack_item.subs[num_serialized], context ) if final or not subs: + # the subcomponent is a leaf if subs: result_bufs_list.extend(subs) else: + # the subcomponent has its own subcomponents, we push itself + # into stack and process its children stack_item = _SerialStackItem(serialized, subs) serial_stack.append(stack_item) + # note that the serialized header should not be recorded + # as we are now processing the subcomponent itself serialized = None + # we keep an empty dict for extra metas required for other modules return ({}, serialized), result_bufs_list @@ -518,14 +613,8 @@ cdef class _DeserialStackItem: self.subs_deserialized = [] -cdef void _fill_placeholders(dict context, obj_id, result): - context_val, context[obj_id] = context.get(obj_id), result - if type(context_val) is Placeholder: - for cb in (context_val).callbacks: - cb(result) - - cdef _deserial_single(tuple serialized, dict context, list subs): + """Deserialize a single object""" cdef Serializer serializer cdef int64_t num_subs @@ -533,11 +622,35 @@ cdef _deserial_single(tuple serialized, dict context, list subs): serializer = _deserializers[serializer_id] res = serializer.deserial(serialized[4:], context, subs) - _fill_placeholders(context, obj_id, res) + # get previously-recorded context values + context_val, context[obj_id] = context.get(obj_id), res + # if previously recorded object is a Placeholder, + # replace it with callbacks + if type(context_val) is Placeholder: + for cb in (context_val).callbacks: + cb(res) return res def deserialize(tuple serialized, list buffers, dict context = None): + """ + Deserialize an object with serialized headers and buffers + + Parameters + ---------- + serialized: Tuple + Serialized object header + buffers: List + List of buffers extracted from serialize() calls + context: Dict + Serialization context for replacing Placeholder + objects + + Returns + ------- + result: Any + Deserialized object + """ cdef list deserial_stack = [] cdef _DeserialStackItem stack_item cdef int64_t num_subs, num_deserialized, buf_pos = 0 @@ -546,10 +659,11 @@ def deserialize(tuple serialized, list buffers, dict context = None): cdef object deserialized = None context = context if context is not None else dict() - # drop extra prop field + # drop extra meta field serialized = serialized[-1] serializer_id, obj_id, num_subs, final = serialized[:4] if final or num_subs == 0: + # marked as a leaf node, return directly return _deserial_single(serialized, context, buffers) deserial_stack.append( @@ -561,25 +675,33 @@ def deserialize(tuple serialized, list buffers, dict context = None): while deserial_stack: stack_item = deserial_stack[-1] if deserialized is not None: + # have previously-deserialized results, record first stack_item.subs_deserialized.append(deserialized) num_deserialized = len(stack_item.subs_deserialized) if len(stack_item.subs) == num_deserialized: + # all subcomponents deserialized, we can deserialize the object itself deserialized = _deserial_single( stack_item.serialized, context, stack_item.subs_deserialized ) deserial_stack.pop() else: + # select next subcomponent to process serialized = stack_item.subs[num_deserialized] serializer_id, obj_id, num_subs, final = serialized[:4] if final or num_subs == 0: + # next subcomponent is a leaf, just deserialize deserialized = _deserial_single( serialized, context, buffers[buf_pos : buf_pos + num_subs] ) buf_pos += num_subs else: + # next subcomponent has its own subcomponents, we push it + # into stack and start handling its children stack_item = _DeserialStackItem( serialized[:-num_subs], serialized[-num_subs:] ) deserial_stack.append(stack_item) + # note that the deserialized object should be cleaned + # as we are just starting to handle the subcomponent itself deserialized = None return deserialized diff --git a/mars/serialization/tests/test_serial.py b/mars/serialization/tests/test_serial.py index 70e575e3ff..f56c7911dd 100644 --- a/mars/serialization/tests/test_serial.py +++ b/mars/serialization/tests/test_serial.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import OrderedDict +from collections import defaultdict, OrderedDict import numpy as np import pandas as pd @@ -31,6 +31,7 @@ from ...tests.core import require_cupy, require_cudf from ...utils import lazy_import from .. import serialize, deserialize +from ..core import Placeholder cupy = lazy_import("cupy", globals=globals()) cudf = lazy_import("cudf", globals=globals()) @@ -53,6 +54,7 @@ class CustomList(list): CustomList([3, 4, CustomList([5, 6])]), {"abc": 5.6, "def": [3.4]}, OrderedDict([("abcd", 5.6)]), + defaultdict(lambda: 0, [("abcd", 0)]), ], ) def test_core(val): @@ -61,8 +63,26 @@ def test_core(val): assert val == deserialized +def test_strings(): + str_obj = "abcd" * 1024 + obj = [str_obj, str_obj] + header, bufs = serialize(obj) + assert len(header) < len(str_obj) * 2 + bufs = [memoryview(buf) for buf in bufs] + assert obj == deserialize(header, bufs) + + +def test_placeholder_obj(): + assert Placeholder(1024) == Placeholder(1024) + assert hash(Placeholder(1024)) == hash(Placeholder(1024)) + assert Placeholder(1024) != Placeholder(1023) + assert hash(Placeholder(1024)) != hash(Placeholder(1023)) + assert Placeholder(1024) != 1024 + assert "1024" in repr(Placeholder(1024)) + + def test_nested_list(): - val = ["a" * 100] * 100 + val = [b"a" * 1200] * 10 val[0] = val deserialized = deserialize(*serialize(val)) assert deserialized[0] is deserialized From eff32bcb1d12f57b40f8d24aa99a44e4f965e5dc Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Sun, 17 Apr 2022 14:06:03 +0800 Subject: [PATCH 3/3] Fix GPU errors --- ci/copycheck.py | 2 +- docs/source/norm_zh.py | 2 +- mars/core/context.py | 4 ++-- mars/core/entity/chunks.py | 2 +- mars/core/entity/tileables.py | 2 +- mars/core/graph/builder/chunk.py | 2 +- mars/core/operand/core.py | 2 +- mars/core/operand/fuse.py | 4 +--- mars/dataframe/arithmetic/__init__.py | 2 +- mars/dataframe/arithmetic/around.py | 2 +- mars/dataframe/arrays.py | 4 +--- mars/dataframe/base/cut.py | 2 +- mars/dataframe/base/drop_duplicates.py | 4 ++-- mars/dataframe/base/duplicated.py | 2 +- mars/dataframe/base/map.py | 2 +- mars/dataframe/datastore/to_parquet.py | 2 +- mars/dataframe/groupby/aggregation.py | 2 +- mars/dataframe/indexing/iloc.py | 2 +- mars/dataframe/indexing/index_lib.py | 2 +- mars/dataframe/indexing/loc.py | 2 +- mars/dataframe/indexing/reindex.py | 2 +- .../indexing/tests/test_indexing_execution.py | 4 +--- mars/dataframe/merge/concat.py | 2 +- mars/dataframe/reduction/core.py | 2 ++ mars/deploy/oscar/cmdline.py | 2 +- mars/deploy/oscar/ray.py | 2 +- mars/deploy/oscar/session.py | 10 +++++----- mars/deploy/oscar/tests/session.py | 2 +- mars/deploy/oscar/tests/test_checked_session.py | 12 ++++++------ mars/learn/cluster/_k_means_elkan_iter.py | 4 +--- mars/learn/cluster/_kmeans.py | 2 +- mars/learn/contrib/utils.py | 2 +- mars/learn/datasets/samples_generator.py | 6 ++---- .../learn/datasets/tests/test_samples_generator.py | 6 +++--- mars/learn/decomposition/_pca.py | 2 +- mars/learn/decomposition/_truncated_svd.py | 2 +- mars/learn/decomposition/tests/test_pca.py | 2 +- mars/learn/glm/_logistic.py | 2 +- mars/learn/metrics/_regresssion.py | 4 ++-- mars/learn/metrics/pairwise/manhattan.py | 2 +- mars/learn/model_selection/_split.py | 4 ++-- mars/learn/neighbors/base.py | 4 ++-- mars/learn/neighbors/tree.py | 4 ++-- mars/learn/preprocessing/_label.py | 12 ++++++------ mars/learn/proxima/simple_index/builder.py | 6 +++--- mars/learn/utils/shuffle.py | 2 +- mars/learn/utils/validation.py | 2 +- mars/lib/filesystem/arrow.py | 4 ++-- mars/lib/uhashring/ring.py | 2 +- mars/oscar/backends/communication/dummy.py | 2 +- mars/oscar/backends/communication/socket.py | 6 ++---- mars/oscar/backends/config.py | 2 +- mars/oscar/backends/pool.py | 6 +++--- mars/remote/run_script.py | 2 +- mars/serialization/serializables/field_type.py | 6 +++--- mars/services/lifecycle/supervisor/tracker.py | 8 ++------ mars/services/session/api/oscar.py | 8 ++------ mars/services/storage/core.py | 8 ++++---- mars/services/storage/spill.py | 4 ++-- mars/services/subtask/worker/processor.py | 14 +++++++------- mars/services/web/tests/test_core.py | 4 ++-- mars/storage/cuda.py | 12 ++++++------ mars/tensor/base/broadcast_to.py | 2 +- mars/tensor/base/delete.py | 2 +- mars/tensor/base/insert.py | 2 +- mars/tensor/base/partition.py | 2 +- mars/tensor/base/sort.py | 10 ++++------ mars/tensor/datasource/from_tiledb.py | 4 ++-- mars/tensor/einsum/einsumfunc.py | 2 +- mars/tensor/indexing/compress.py | 4 ++-- mars/tensor/indexing/take.py | 2 +- mars/tensor/lib/index_tricks.py | 2 +- mars/tensor/linalg/cholesky.py | 2 +- mars/tensor/linalg/inv.py | 2 +- mars/tensor/linalg/lu.py | 2 +- mars/tensor/linalg/qr.py | 2 +- mars/tensor/linalg/svd.py | 2 +- mars/tensor/merge/stack.py | 2 +- mars/tensor/reduction/core.py | 2 +- mars/tensor/spatial/distance/cdist.py | 2 +- mars/tensor/spatial/distance/pdist.py | 2 +- mars/tensor/spatial/distance/squareform.py | 4 ++-- mars/tensor/statistics/average.py | 2 +- mars/tensor/statistics/quantile.py | 2 +- mars/tensor/stats/ttest.py | 2 +- mars/tensor/utils.py | 4 +--- 86 files changed, 142 insertions(+), 164 deletions(-) diff --git a/ci/copycheck.py b/ci/copycheck.py index 916bf0681a..338ba07dca 100755 --- a/ci/copycheck.py +++ b/ci/copycheck.py @@ -52,7 +52,7 @@ def main(): if miss_files: file_list = "\n ".join(miss_files) sys.stderr.write( - f"Please add missing copyright header for files:\n" f" {file_list}\n" + f"Please add missing copyright header for files:\n {file_list}\n" ) sys.exit(1) diff --git a/docs/source/norm_zh.py b/docs/source/norm_zh.py index dd1b3aeb51..e6c1a2e2bf 100755 --- a/docs/source/norm_zh.py +++ b/docs/source/norm_zh.py @@ -121,7 +121,7 @@ def main(): return pofile.normalize = _normalize - for root, dirs, files in os.walk("."): + for root, _dirs, files in os.walk("."): if "zh" not in root: continue for f in files: diff --git a/mars/core/context.py b/mars/core/context.py index 8c93e34a06..7a0952eb4d 100644 --- a/mars/core/context.py +++ b/mars/core/context.py @@ -41,13 +41,13 @@ def __init__( # try to get session id from environment session_id = os.environ.get("MARS_SESSION_ID") if session_id is None: - raise ValueError("session_id should be provided " "to create a context") + raise ValueError("session_id should be provided to create a context") if supervisor_address is None: # try to get supervisor address from environment supervisor_address = os.environ.get("MARS_SUPERVISOR_ADDRESS") if supervisor_address is None: raise ValueError( - "supervisor_address should be provided " "to create a context" + "supervisor_address should be provided to create a context" ) self.session_id = session_id diff --git a/mars/core/entity/chunks.py b/mars/core/entity/chunks.py index efcb0ee210..74ee909686 100644 --- a/mars/core/entity/chunks.py +++ b/mars/core/entity/chunks.py @@ -25,7 +25,7 @@ class ChunkData(EntityData): def __repr__(self): if self.op.stage is None: - return f"Chunk " + return f"Chunk " else: return ( f"Chunk 0: raise TypeError( - "round() takes 0 positional arguments " f"but {len(args)} was given" + f"round() takes 0 positional arguments but {len(args)} was given" ) op = DataFrameAround(decimals=decimals, **kwargs) return op(df) diff --git a/mars/dataframe/arrays.py b/mars/dataframe/arrays.py index f164c4f5c0..4bfcc12561 100644 --- a/mars/dataframe/arrays.py +++ b/mars/dataframe/arrays.py @@ -232,9 +232,7 @@ def __init__(self, values, dtype: ArrowDtype = None, copy=False): # just for infer dtypes purpose self._init_by_numpy(values, dtype=dtype, copy=copy) else: - raise ImportError( - "Cannot create ArrowArray " "when `pyarrow` not installed" - ) + raise ImportError("Cannot create ArrowArray when `pyarrow` not installed") # for test purpose self._force_use_pandas = pandas_only diff --git a/mars/dataframe/base/cut.py b/mars/dataframe/base/cut.py index 880f4f933c..eba7d2ad43 100644 --- a/mars/dataframe/base/cut.py +++ b/mars/dataframe/base/cut.py @@ -266,7 +266,7 @@ def tile(cls, op): # calculate bins if np.isinf(min_val) or np.isinf(max_val): raise ValueError( - "cannot specify integer `bins` " "when input data contains infinity" + "cannot specify integer `bins` when input data contains infinity" ) elif min_val == max_val: # adjust end points before binning min_val -= 0.001 * abs(min_val) if min_val != 0 else 0.001 diff --git a/mars/dataframe/base/drop_duplicates.py b/mars/dataframe/base/drop_duplicates.py index baaf8db2e7..0209e9c861 100644 --- a/mars/dataframe/base/drop_duplicates.py +++ b/mars/dataframe/base/drop_duplicates.py @@ -357,7 +357,7 @@ def series_drop_duplicates(series, keep="first", inplace=False, method="auto"): """ if method not in ("auto", "tree", "shuffle", None): raise ValueError( - "method could only be one of " "'auto', 'tree', 'shuffle' or None" + "method could only be one of 'auto', 'tree', 'shuffle' or None" ) op = DataFrameDropDuplicates(keep=keep, method=method) return op(series, inplace=inplace) @@ -413,7 +413,7 @@ def index_drop_duplicates(index, keep="first", method="auto"): """ if method not in ("auto", "tree", "shuffle", None): raise ValueError( - "method could only be one of " "'auto', 'tree', 'shuffle' or None" + "method could only be one of 'auto', 'tree', 'shuffle' or None" ) op = DataFrameDropDuplicates(keep=keep, method=method) return op(index) diff --git a/mars/dataframe/base/duplicated.py b/mars/dataframe/base/duplicated.py index c5e360b79b..370c40f2a9 100644 --- a/mars/dataframe/base/duplicated.py +++ b/mars/dataframe/base/duplicated.py @@ -469,7 +469,7 @@ def series_duplicated(series, keep="first", method="auto"): """ if method not in ("auto", "tree", "shuffle", None): raise ValueError( - "method could only be one of " "'auto', 'tree', 'shuffle' or None" + "method could only be one of 'auto', 'tree', 'shuffle' or None" ) op = DataFrameDuplicated(keep=keep, method=method) return op(series) diff --git a/mars/dataframe/base/map.py b/mars/dataframe/base/map.py index 2b9f4357ce..a822a9e320 100644 --- a/mars/dataframe/base/map.py +++ b/mars/dataframe/base/map.py @@ -103,7 +103,7 @@ def __call__(self, series, dtype): if dtype is None: raise ValueError( - "cannot infer dtype, " "it needs to be specified manually for `map`" + "cannot infer dtype, it needs to be specified manually for `map`" ) else: dtype = np.int64 if dtype is int else dtype diff --git a/mars/dataframe/datastore/to_parquet.py b/mars/dataframe/datastore/to_parquet.py index 54f30df7cd..2a06e12a08 100644 --- a/mars/dataframe/datastore/to_parquet.py +++ b/mars/dataframe/datastore/to_parquet.py @@ -190,7 +190,7 @@ def execute(cls, ctx, op): ) else: # pragma: no cover raise NotImplementedError( - "Only support pyarrow engine when " "specify `partition_cols`." + "Only support pyarrow engine when specify `partition_cols`." ) ctx[out.key] = pd.DataFrame() diff --git a/mars/dataframe/groupby/aggregation.py b/mars/dataframe/groupby/aggregation.py index 2630f084c5..f24a1d2e5d 100644 --- a/mars/dataframe/groupby/aggregation.py +++ b/mars/dataframe/groupby/aggregation.py @@ -1048,7 +1048,7 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs): method = "auto" if method not in ["shuffle", "tree", "auto"]: raise ValueError( - f"Method {method} is not available, " "please specify 'tree' or 'shuffle" + f"Method {method} is not available, please specify 'tree' or 'shuffle" ) if not is_funcs_aggregate(func, ndim=groupby.ndim): diff --git a/mars/dataframe/indexing/iloc.py b/mars/dataframe/indexing/iloc.py index eee323f12f..965a150442 100644 --- a/mars/dataframe/indexing/iloc.py +++ b/mars/dataframe/indexing/iloc.py @@ -86,7 +86,7 @@ def process_iloc_indexes(inp, indexes): index = index.fetch() except (RuntimeError, ValueError): raise NotImplementedError( - "indexer on axis columns cannot be " "non-executed tensor" + "indexer on axis columns cannot be non-executed tensor" ) if index.dtype != np.bool_: index = index.astype(np.int64) diff --git a/mars/dataframe/indexing/index_lib.py b/mars/dataframe/indexing/index_lib.py index 7d07034149..d0c6096803 100644 --- a/mars/dataframe/indexing/index_lib.py +++ b/mars/dataframe/indexing/index_lib.py @@ -1030,7 +1030,7 @@ def accept(cls, raw_index): def parse(self, raw_index, context: IndexHandlerContext) -> IndexInfo: if context.input_axis == 1: # pragma: no cover raise NotImplementedError( - "do not support tensor-based index " "on columns axis" + "do not support tensor-based index on columns axis" ) info = LabelFancyIndexInfo( IndexType.label_fancy_index, diff --git a/mars/dataframe/indexing/loc.py b/mars/dataframe/indexing/loc.py index 3bd64a5ad4..2f4939ad6e 100644 --- a/mars/dataframe/indexing/loc.py +++ b/mars/dataframe/indexing/loc.py @@ -60,7 +60,7 @@ def process_loc_indexes(inp, indexes): index = index.fetch() except (RuntimeError, ValueError): raise NotImplementedError( - "indexer on axis columns cannot be " "non-executed tensor" + "indexer on axis columns cannot be non-executed tensor" ) new_indexes.append(index) diff --git a/mars/dataframe/indexing/reindex.py b/mars/dataframe/indexing/reindex.py index 986cd841fc..a1b77845fb 100644 --- a/mars/dataframe/indexing/reindex.py +++ b/mars/dataframe/indexing/reindex.py @@ -760,7 +760,7 @@ def reindex(df_or_series, *args, **kwargs): columns = columns.fetch() except ValueError: raise NotImplementedError( - "`columns` need to be executed first " "if it's a Mars object" + "`columns` need to be executed first if it's a Mars object" ) elif columns is not None: columns = np.asarray(columns) diff --git a/mars/dataframe/indexing/tests/test_indexing_execution.py b/mars/dataframe/indexing/tests/test_indexing_execution.py index 5c49488787..68713d1a3c 100644 --- a/mars/dataframe/indexing/tests/test_indexing_execution.py +++ b/mars/dataframe/indexing/tests/test_indexing_execution.py @@ -997,9 +997,7 @@ def _execute_data_source(ctx, op): # pragma: no cover result = ctx[op.outputs[0].key] if not isinstance(usecols, list): if not isinstance(result, pd.Series): - raise RuntimeError( - "Out data should be a Series, " f"got {type(result)}" - ) + raise RuntimeError(f"Out data should be a Series, got {type(result)}") elif len(result.columns) > len(usecols): params = dict( (k, getattr(op, k, None)) diff --git a/mars/dataframe/merge/concat.py b/mars/dataframe/merge/concat.py index 417804bf13..4ef185ce9e 100644 --- a/mars/dataframe/merge/concat.py +++ b/mars/dataframe/merge/concat.py @@ -553,7 +553,7 @@ def _call_dataframes(self, objs): new_objs = [obj if obj.ndim == 2 else obj.to_frame() for obj in objs] else: # pragma: no cover raise NotImplementedError( - "Does not support concat dataframes " "which has different index" + "Does not support concat dataframes which has different index" ) shape = (objs[0].shape[0], col_length) diff --git a/mars/dataframe/reduction/core.py b/mars/dataframe/reduction/core.py index c61a69c30b..14d4050038 100644 --- a/mars/dataframe/reduction/core.py +++ b/mars/dataframe/reduction/core.py @@ -405,6 +405,8 @@ def _call_series(self, series): if func_name == "custom_reduction": empty_series = build_series(series, ensure_string=True) result_scalar = getattr(self, "custom_reduction").__call_agg__(empty_series) + if hasattr(result_scalar, "to_pandas"): # pragma: no cover + result_scalar = result_scalar.to_pandas() result_dtype = pd.Series(result_scalar).dtype else: result_dtype = _get_series_reduction_dtype( diff --git a/mars/deploy/oscar/cmdline.py b/mars/deploy/oscar/cmdline.py index e64019c57f..5e41a1c939 100644 --- a/mars/deploy/oscar/cmdline.py +++ b/mars/deploy/oscar/cmdline.py @@ -65,7 +65,7 @@ def config_args(self, parser): parser.add_argument( "-p", "--ports", - help="ports of the service, must equal to" "num of processes", + help="ports of the service, must equal to num of processes", ) parser.add_argument("-c", "--config", help="service configuration") parser.add_argument( diff --git a/mars/deploy/oscar/ray.py b/mars/deploy/oscar/ray.py index e11c513b74..a2e66cb4a0 100644 --- a/mars/deploy/oscar/ray.py +++ b/mars/deploy/oscar/ray.py @@ -327,7 +327,7 @@ async def new_cluster( ray.init(num_cpus=16 + worker_num * worker_cpu) ensure_isolation_created(kwargs) if kwargs: # pragma: no cover - raise TypeError(f"new_cluster got unexpected " f"arguments: {list(kwargs)}") + raise TypeError(f"new_cluster got unexpected arguments: {list(kwargs)}") n_supervisor_process = kwargs.get( "n_supervisor_process", DEFAULT_SUPERVISOR_SUB_POOL_NUM ) diff --git a/mars/deploy/oscar/session.py b/mars/deploy/oscar/session.py index 57d819eb0d..a63570736d 100644 --- a/mars/deploy/oscar/session.py +++ b/mars/deploy/oscar/session.py @@ -853,7 +853,7 @@ async def init( if kwargs: # pragma: no cover unexpected_keys = ", ".join(list(kwargs.keys())) raise TypeError( - f"Oscar session got unexpected " f"arguments: {unexpected_keys}" + f"Oscar session got unexpected arguments: {unexpected_keys}" ) if urlparse(address).scheme == "http": @@ -1030,7 +1030,7 @@ def _get_to_fetch_tileable( elif isinstance(tileable.op, Fetch): break else: - raise ValueError(f"Cannot fetch unexecuted " f"tileable: {tileable!r}") + raise ValueError(f"Cannot fetch unexecuted tileable: {tileable!r}") if isinstance(tileable.op, Fetch): return tileable, indexes @@ -1083,7 +1083,7 @@ async def fetch(self, *tileables, **kwargs) -> list: if kwargs: # pragma: no cover unexpected_keys = ", ".join(list(kwargs.keys())) - raise TypeError(f"`fetch` got unexpected " f"arguments: {unexpected_keys}") + raise TypeError(f"`fetch` got unexpected arguments: {unexpected_keys}") with enter_mode(build=True): chunks = [] @@ -1167,13 +1167,13 @@ async def fetch_infos(self, *tileables, fields, **kwargs) -> list: for field_name in fields: if field_name not in available_fields: # pragma: no cover raise TypeError( - f"`fetch_infos` got unexpected " f"field name: {field_name}" + f"`fetch_infos` got unexpected field name: {field_name}" ) fields = set(fields) if kwargs: # pragma: no cover unexpected_keys = ", ".join(list(kwargs.keys())) - raise TypeError(f"`fetch` got unexpected " f"arguments: {unexpected_keys}") + raise TypeError(f"`fetch` got unexpected arguments: {unexpected_keys}") with enter_mode(build=True): chunks = [] diff --git a/mars/deploy/oscar/tests/session.py b/mars/deploy/oscar/tests/session.py index f3c7b2a776..1166ca4527 100644 --- a/mars/deploy/oscar/tests/session.py +++ b/mars/deploy/oscar/tests/session.py @@ -76,7 +76,7 @@ async def fetch(self, *tileables, **kwargs): extra_config = kwargs.pop("extra_config", dict()) if kwargs: unexpected_keys = ", ".join(list(kwargs.keys())) - raise TypeError(f"`fetch` got unexpected " f"arguments: {unexpected_keys}") + raise TypeError(f"`fetch` got unexpected arguments: {unexpected_keys}") self._check_options = self._extract_check_options(extra_config) results = await super().fetch(*tileables) diff --git a/mars/deploy/oscar/tests/test_checked_session.py b/mars/deploy/oscar/tests/test_checked_session.py index ec0f32141d..ab30f495db 100644 --- a/mars/deploy/oscar/tests/test_checked_session.py +++ b/mars/deploy/oscar/tests/test_checked_session.py @@ -59,9 +59,9 @@ def test_checked_session(setup): def test_check_task_preprocessor(setup): config = load_config(CONFIG_FILE) - config["task"]["task_preprocessor_cls"] = ( - "mars.deploy.oscar.tests." "test_checked_session.FakeCheckedTaskPreprocessor" - ) + config["task"][ + "task_preprocessor_cls" + ] = "mars.deploy.oscar.tests.test_checked_session.FakeCheckedTaskPreprocessor" sess = new_test_session(default=True, config=config) @@ -79,9 +79,9 @@ def test_check_task_preprocessor(setup): def test_check_subtask_processor(setup): config = load_config(CONFIG_FILE) - config["subtask"]["subtask_processor_cls"] = ( - "mars.deploy.oscar.tests." "test_checked_session.FakeCheckedSubtaskProcessor" - ) + config["subtask"][ + "subtask_processor_cls" + ] = "mars.deploy.oscar.tests.test_checked_session.FakeCheckedSubtaskProcessor" sess = new_test_session(default=True, config=config) diff --git a/mars/learn/cluster/_k_means_elkan_iter.py b/mars/learn/cluster/_k_means_elkan_iter.py index 9be26216a2..f01088d40c 100644 --- a/mars/learn/cluster/_k_means_elkan_iter.py +++ b/mars/learn/cluster/_k_means_elkan_iter.py @@ -193,9 +193,7 @@ def execute(cls, ctx, op): with device(device_id): if xp is cp: # pragma: no cover - raise NotImplementedError( - "cannot support init_bounds " "for kmeans elkan" - ) + raise NotImplementedError("cannot support init_bounds for kmeans elkan") n_samples = x.shape[0] n_clusters = op.n_clusters diff --git a/mars/learn/cluster/_kmeans.py b/mars/learn/cluster/_kmeans.py index 436dc423c6..148995bff4 100644 --- a/mars/learn/cluster/_kmeans.py +++ b/mars/learn/cluster/_kmeans.py @@ -766,7 +766,7 @@ def _check_params(self, X): # n_clusters if X.shape[0] < self.n_clusters: raise ValueError( - f"n_samples={X.shape[0]} should be >= " f"n_clusters={self.n_clusters}." + f"n_samples={X.shape[0]} should be >= n_clusters={self.n_clusters}." ) # tol diff --git a/mars/learn/contrib/utils.py b/mars/learn/contrib/utils.py index 0f54180f95..1590768c4b 100644 --- a/mars/learn/contrib/utils.py +++ b/mars/learn/contrib/utils.py @@ -20,7 +20,7 @@ def make_import_error_func(package_name): def _func(*_, **__): # pragma: no cover raise ImportError( - f"Cannot import {package_name}, please reinstall " f"that package." + f"Cannot import {package_name}, please reinstall that package." ) return _func diff --git a/mars/learn/datasets/samples_generator.py b/mars/learn/datasets/samples_generator.py index 26febc1654..62df2c0914 100644 --- a/mars/learn/datasets/samples_generator.py +++ b/mars/learn/datasets/samples_generator.py @@ -177,9 +177,7 @@ def make_classification( " be smaller or equal 2 ** n_informative" ) if weights and len(weights) not in [n_classes, n_classes - 1]: - raise ValueError( - "Weights specified but incompatible with number " "of classes." - ) + raise ValueError("Weights specified but incompatible with number of classes.") n_useless = n_features - n_informative - n_redundant - n_repeated n_clusters = n_classes * n_clusters_per_class @@ -506,7 +504,7 @@ def make_blobs( assert len(centers) == n_centers except TypeError: raise ValueError( - "Parameter `centers` must be array-like. " f"Got {centers!r} instead" + f"Parameter `centers` must be array-like. Got {centers!r} instead" ) except AssertionError: raise ValueError( diff --git a/mars/learn/datasets/tests/test_samples_generator.py b/mars/learn/datasets/tests/test_samples_generator.py index e75f78b48b..d21fadfe54 100644 --- a/mars/learn/datasets/tests/test_samples_generator.py +++ b/mars/learn/datasets/tests/test_samples_generator.py @@ -149,7 +149,7 @@ def test_make_classification_informative_features(setup): assert_array_almost_equal( np.bincount(y) / len(y) // weights, [1] * n_classes, - err_msg="Wrong number of samples " "per class", + err_msg="Wrong number of samples per class", ) # Ensure on vertices of hypercube @@ -160,7 +160,7 @@ def test_make_classification_informative_features(setup): np.abs(centroid) / class_sep, np.ones(n_informative), decimal=5, - err_msg="Clusters are not " "centered on hypercube " "vertices", + err_msg="Clusters are not centered on hypercube vertices", ) else: assert_raises( @@ -317,7 +317,7 @@ def test_make_blobs_error(setup): centers=centers, cluster_std=cluster_stds[:-1], ) - wrong_type_msg = "Parameter `centers` must be array-like. " f"Got {3!r} instead" + wrong_type_msg = f"Parameter `centers` must be array-like. Got {3!r} instead" assert_raise_message(ValueError, wrong_type_msg, make_blobs, n_samples, centers=3) diff --git a/mars/learn/decomposition/_pca.py b/mars/learn/decomposition/_pca.py index 28fcc81c97..728e3fd078 100644 --- a/mars/learn/decomposition/_pca.py +++ b/mars/learn/decomposition/_pca.py @@ -446,7 +446,7 @@ def _fit_full(self, X, n_components, session=None, run_kwargs=None): if n_components == "mle": if n_samples < n_features: raise ValueError( - "n_components='mle' is only supported " "if n_samples >= n_features" + "n_components='mle' is only supported if n_samples >= n_features" ) elif not 0 <= n_components <= min(n_samples, n_features): raise ValueError( diff --git a/mars/learn/decomposition/_truncated_svd.py b/mars/learn/decomposition/_truncated_svd.py index 59e208d873..f3322ba40c 100644 --- a/mars/learn/decomposition/_truncated_svd.py +++ b/mars/learn/decomposition/_truncated_svd.py @@ -189,7 +189,7 @@ def fit_transform(self, X, y=None, session=None): n_features = X.shape[1] if k >= n_features: raise ValueError( - "n_components must be < n_features; " f"got {k} >= {n_features}" + f"n_components must be < n_features; got {k} >= {n_features}" ) U, Sigma, VT = randomized_svd( X, self.n_components, n_iter=self.n_iter, random_state=random_state diff --git a/mars/learn/decomposition/tests/test_pca.py b/mars/learn/decomposition/tests/test_pca.py index 8bd9e5bf7b..48c0489a75 100644 --- a/mars/learn/decomposition/tests/test_pca.py +++ b/mars/learn/decomposition/tests/test_pca.py @@ -435,7 +435,7 @@ def test_n_components_mle(setup): n_components_dict[solver] = pca.n_components_ else: # arpack/randomized solver error_message = ( - "n_components='mle' cannot be a string with " f"svd_solver='{solver}'" + f"n_components='mle' cannot be a string with svd_solver='{solver}'" ) assert_raise_message(ValueError, error_message, pca.fit, X) assert n_components_dict["auto"] == n_components_dict["full"] diff --git a/mars/learn/glm/_logistic.py b/mars/learn/glm/_logistic.py index 3c99426d1a..bdf3b68062 100644 --- a/mars/learn/glm/_logistic.py +++ b/mars/learn/glm/_logistic.py @@ -51,7 +51,7 @@ def _check_multi_class(multi_class, solver, n_classes): return "multinomial" raise ValueError( - "multi_class should be 'multinomial', " "'ovr' or 'auto'. Got %s." % multi_class + "multi_class should be 'multinomial', 'ovr' or 'auto'. Got %s." % multi_class ) diff --git a/mars/learn/metrics/_regresssion.py b/mars/learn/metrics/_regresssion.py index cc6a2c89b4..ad611f9e64 100644 --- a/mars/learn/metrics/_regresssion.py +++ b/mars/learn/metrics/_regresssion.py @@ -90,10 +90,10 @@ def _check_reg_targets(y_true, y_pred, multioutput, dtype="numeric"): elif multioutput is not None: multioutput = check_array(multioutput, ensure_2d=False) if n_outputs == 1: - raise ValueError("Custom weights are useful only in " "multi-output cases.") + raise ValueError("Custom weights are useful only in multi-output cases.") elif n_outputs != len(multioutput): raise ValueError( - ("There must be equally many custom weights " "(%d) as outputs (%d).") + ("There must be equally many custom weights (%d) as outputs (%d).") % (len(multioutput), n_outputs) ) y_type = "continuous" if n_outputs == 1 else "continuous-multioutput" diff --git a/mars/learn/metrics/pairwise/manhattan.py b/mars/learn/metrics/pairwise/manhattan.py index 28a91dcde4..4920a3803b 100644 --- a/mars/learn/metrics/pairwise/manhattan.py +++ b/mars/learn/metrics/pairwise/manhattan.py @@ -120,7 +120,7 @@ def execute(cls, ctx, op): else: # pragma: no cover # we cannot support sparse raise NotImplementedError( - "cannot support calculate manhattan " "distances on GPU" + "cannot support calculate manhattan distances on GPU" ) diff --git a/mars/learn/model_selection/_split.py b/mars/learn/model_selection/_split.py index e529cde944..f142c634a0 100644 --- a/mars/learn/model_selection/_split.py +++ b/mars/learn/model_selection/_split.py @@ -125,7 +125,7 @@ def train_test_split(*arrays, **options): if shuffle is False: if stratify is not None: # pragma: no cover raise ValueError( - "Stratified train/test split is not implemented for " "shuffle=False" + "Stratified train/test split is not implemented for shuffle=False" ) iterables = ((a[:n_train], a[n_train : n_train + n_test]) for a in arrays) @@ -305,7 +305,7 @@ def __init__(self, n_splits, *, shuffle, random_state): ) if not isinstance(shuffle, bool): - raise TypeError("shuffle must be True or False;" " got {0}".format(shuffle)) + raise TypeError("shuffle must be True or False; got {0}".format(shuffle)) if not shuffle and random_state is not None: # None is the default raise ValueError( diff --git a/mars/learn/neighbors/base.py b/mars/learn/neighbors/base.py index 796bf5c1e6..702920540e 100644 --- a/mars/learn/neighbors/base.py +++ b/mars/learn/neighbors/base.py @@ -164,7 +164,7 @@ def _fit(self, X, session=None, run_kwargs=None): if self.metric == "minkowski": p = self.effective_metric_params_.pop("p", 2) if p < 1: # pragma: no cover - raise ValueError("p must be greater than one " "for minkowski metric") + raise ValueError("p must be greater than one for minkowski metric") elif p == 1: self.effective_metric_ = "manhattan" elif p == 2: @@ -200,7 +200,7 @@ def _fit(self, X, session=None, run_kwargs=None): if X.issparse(): if self.algorithm not in ("auto", "brute"): - warnings.warn("cannot use tree with sparse input: " "using brute force") + warnings.warn("cannot use tree with sparse input: using brute force") if self.effective_metric_ not in VALID_METRICS_SPARSE[ "brute" ] and not callable(self.effective_metric_): diff --git a/mars/learn/neighbors/tree.py b/mars/learn/neighbors/tree.py index 4e1e6986ab..d70f204bcc 100644 --- a/mars/learn/neighbors/tree.py +++ b/mars/learn/neighbors/tree.py @@ -103,7 +103,7 @@ def tile(cls, op): def execute(cls, ctx, op): if op.gpu: # pragma: no cover raise NotImplementedError( - "Does not support tree-based " "nearest neighbors on GPU" + "Does not support tree-based nearest neighbors on GPU" ) a = ctx[op.input.key] @@ -263,7 +263,7 @@ def tile(cls, op): def execute(cls, ctx, op): if op.gpu: # pragma: no cover raise NotImplementedError( - "Does not support tree-based " "nearest neighbors on GPU" + "Does not support tree-based nearest neighbors on GPU" ) x = ctx[op.input.key] diff --git a/mars/learn/preprocessing/_label.py b/mars/learn/preprocessing/_label.py index 4dc120a16f..f2e05656b4 100644 --- a/mars/learn/preprocessing/_label.py +++ b/mars/learn/preprocessing/_label.py @@ -310,7 +310,7 @@ def fit(self, y, session=None, run_kwargs=None): ) if "multioutput" in self.y_type_: raise ValueError( - "Multioutput target data is not supported with " "label binarization" + "Multioutput target data is not supported with label binarization" ) if _num_samples(y) == 0: # pragma: no cover raise ValueError("y has 0 samples: %r" % y) @@ -375,7 +375,7 @@ def transform(self, y, session=None, run_kwargs=None): ) y_is_multilabel = target.startswith("multilabel") if y_is_multilabel and not self.y_type_.startswith("multilabel"): - raise ValueError("The object was not fitted with multilabel" " input.") + raise ValueError("The object was not fitted with multilabel input.") return label_binarize( y, @@ -547,7 +547,7 @@ def tile(cls, op: "LabelBinarize"): y_type = y_type.item() if hasattr(y_type, "item") else y_type if "multioutput" in y_type: raise ValueError( - "Multioutput target data is not supported with label " "binarization" + "Multioutput target data is not supported with label binarization" ) if y_type == "unknown": raise ValueError("The type of target data is not known") @@ -588,7 +588,7 @@ def tile(cls, op: "LabelBinarize"): out_shape = y.shape else: raise ValueError( - "%s target data is not supported with label " "binarization" % y_type + "%s target data is not supported with label binarization" % y_type ) out_chunks = [] @@ -671,7 +671,7 @@ def execute(cls, ctx: Union[dict, Context], op: "LabelBinarize"): Y.data = data else: # pragma: no cover raise ValueError( - "%s target data is not supported with label " "binarization" % y_type + "%s target data is not supported with label binarization" % y_type ) if not sparse_output: @@ -827,7 +827,7 @@ def _inverse_binarize_thresholding( if output_type != "binary" and y.shape[1] != len(classes): raise ValueError( - "The number of class is not equal to the number of " "dimension of y." + "The number of class is not equal to the number of dimension of y." ) classes = np.asarray(classes) diff --git a/mars/learn/proxima/simple_index/builder.py b/mars/learn/proxima/simple_index/builder.py index d5bbc30e40..b835f64e99 100644 --- a/mars/learn/proxima/simple_index/builder.py +++ b/mars/learn/proxima/simple_index/builder.py @@ -186,9 +186,9 @@ def _get_atleast_topk_nsplit(cls, nsplit, topk): elif cur >= topk: new_nsplit.append(cur) new_nsplit = tuple(new_nsplit) - assert sum(new_nsplit) == sum(nsplit), ( - f"sum of nsplit not equal, " f"old: {nsplit}, new: {new_nsplit}" - ) + assert sum(new_nsplit) == sum( + nsplit + ), f"sum of nsplit not equal, old: {nsplit}, new: {new_nsplit}" return new_nsplit diff --git a/mars/learn/utils/shuffle.py b/mars/learn/utils/shuffle.py index 34e045632a..7cdd6ea0fe 100644 --- a/mars/learn/utils/shuffle.py +++ b/mars/learn/utils/shuffle.py @@ -470,7 +470,7 @@ def shuffle(*arrays, **options): random_state = check_random_state(options.pop("random_state", None)).to_numpy() if options: raise TypeError( - "shuffle() got an unexpected " f"keyword argument {next(iter(options))}" + f"shuffle() got an unexpected keyword argument {next(iter(options))}" ) max_ndim = max(ar.ndim for ar in arrays) diff --git a/mars/learn/utils/validation.py b/mars/learn/utils/validation.py index 0e56e1d31c..08839e888b 100644 --- a/mars/learn/utils/validation.py +++ b/mars/learn/utils/validation.py @@ -62,7 +62,7 @@ def _num_samples(x): if hasattr(x.op, "data") and x.op.data is not None: x = np.asarray(x.op.data) raise TypeError( - f"Singleton array {x!r} cannot be considered" " a valid collection." + f"Singleton array {x!r} cannot be considered a valid collection." ) # Check that shape is returning an integer or default to len if isinstance(x.shape[0], numbers.Integral): diff --git a/mars/lib/filesystem/arrow.py b/mars/lib/filesystem/arrow.py index 312249c456..6f30cf65da 100644 --- a/mars/lib/filesystem/arrow.py +++ b/mars/lib/filesystem/arrow.py @@ -87,7 +87,7 @@ def delete(self, path: path_type, recursive: bool = False): raise OSError(f"[Errno 66] Directory not empty: '{path}'") self._arrow_fs.delete_dir(path) else: # pragma: no cover - raise TypeError(f"path({path}) to delete " f"must be a file or directory") + raise TypeError(f"path({path}) to delete must be a file or directory") @implements(FileSystem.rename) def rename(self, path: path_type, new_path: path_type): @@ -141,7 +141,7 @@ def open(self, path: path_type, mode: str = "rb") -> Union[BinaryIO, TextIO]: is_binary = mode.endswith("b") if not is_binary: # pragma: no cover raise ValueError( - f"mode can only be binary for " f"arrow based filesystem, got {mode}" + f"mode can only be binary for arrow based filesystem, got {mode}" ) mode = mode.rstrip("b") if mode == "w": diff --git a/mars/lib/uhashring/ring.py b/mars/lib/uhashring/ring.py index 90c409aa51..c1201ca26e 100644 --- a/mars/lib/uhashring/ring.py +++ b/mars/lib/uhashring/ring.py @@ -50,7 +50,7 @@ def _configure_nodes(self, nodes): nodes = [nodes] elif not isinstance(nodes, (dict, list)): raise ValueError( - "nodes configuration should be a list or a dict," f" got {type(nodes)}" + f"nodes configuration should be a list or a dict, got {type(nodes)}" ) conf_changed = False diff --git a/mars/oscar/backends/communication/dummy.py b/mars/oscar/backends/communication/dummy.py index abb8d09cce..9f06b827ea 100644 --- a/mars/oscar/backends/communication/dummy.py +++ b/mars/oscar/backends/communication/dummy.py @@ -206,7 +206,7 @@ async def connect( server = DummyServer.get_instance(dest_address) if server is None: # pragma: no cover raise RuntimeError( - "DummyServer needs to be created " "first before DummyClient" + "DummyServer needs to be created first before DummyClient" ) if server.stopped: # pragma: no cover raise ConnectionError("Dummy server closed") diff --git a/mars/oscar/backends/communication/socket.py b/mars/oscar/backends/communication/socket.py index 0904621535..448222f30b 100644 --- a/mars/oscar/backends/communication/socket.py +++ b/mars/oscar/backends/communication/socket.py @@ -252,9 +252,7 @@ async def connect( @lru_cache(100) def _gen_unix_socket_default_path(process_index): - return ( - f"{TEMPDIR}/mars/" f"{md5(to_binary(str(process_index))).hexdigest()}" - ) # nosec + return f"{TEMPDIR}/mars/{md5(to_binary(str(process_index))).hexdigest()}" # nosec @register_server @@ -352,7 +350,7 @@ async def connect( (reader, writer) = await asyncio.open_unix_connection(path, **kwargs) except FileNotFoundError: raise ConnectionRefusedError( - "Cannot connect unix socket " "due to file not exists" + "Cannot connect unix socket due to file not exists" ) channel = SocketChannel( reader, writer, local_address=local_address, dest_address=dest_address diff --git a/mars/oscar/backends/config.py b/mars/oscar/backends/config.py index bf97a61add..99195411f2 100644 --- a/mars/oscar/backends/config.py +++ b/mars/oscar/backends/config.py @@ -78,7 +78,7 @@ def get_process_index(self, external_address: str): if external_address in conf["external_address"]: return process_index raise ValueError( - f"Cannot get process_index " f"for {external_address}" + f"Cannot get process_index for {external_address}" ) # pragma: no cover def get_external_addresses(self, label=None) -> List[str]: diff --git a/mars/oscar/backends/pool.py b/mars/oscar/backends/pool.py index a7d60258fb..c9d3ccb76c 100644 --- a/mars/oscar/backends/pool.py +++ b/mars/oscar/backends/pool.py @@ -500,7 +500,7 @@ async def create_actor(self, message: CreateActorMessage) -> ResultMessageType: actor_id = message.actor_id if actor_id in self._actors: raise ActorAlreadyExist( - f"Actor {actor_id} already exist, " f"cannot create" + f"Actor {actor_id} already exist, cannot create" ) actor = message.actor_cls(*message.args, **message.kwargs) @@ -1269,10 +1269,10 @@ async def create_actor_pool( n_process = multiprocessing.cpu_count() if labels and len(labels) != n_process + 1: raise ValueError( - f"`labels` should be of size {n_process + 1}, " f"got {len(labels)}" + f"`labels` should be of size {n_process + 1}, got {len(labels)}" ) if envs and len(envs) != n_process: - raise ValueError(f"`envs` should be of size {n_process}, " f"got {len(envs)}") + raise ValueError(f"`envs` should be of size {n_process}, got {len(envs)}") if auto_recover is True: auto_recover = "actor" if auto_recover not in ("actor", "process", False): diff --git a/mars/remote/run_script.py b/mars/remote/run_script.py index edc4dceb5f..146863107d 100644 --- a/mars/remote/run_script.py +++ b/mars/remote/run_script.py @@ -175,7 +175,7 @@ def execute(cls, ctx, op): def _extract_inputs(data: Dict[str, TileableType] = None) -> List[TileableType]: if data is not None and not isinstance(data, dict): raise TypeError( - "`data` must be a dict whose key is " "variable name and value is data" + "`data` must be a dict whose key is variable name and value is data" ) inputs = [] diff --git a/mars/serialization/serializables/field_type.py b/mars/serialization/serializables/field_type.py index 2d4721bfca..c48f1c56a1 100644 --- a/mars/serialization/serializables/field_type.py +++ b/mars/serialization/serializables/field_type.py @@ -285,7 +285,7 @@ def valid_types(self) -> Tuple[Type, ...]: # pragma: no cover def validate(self, value): if value is not None and not callable(value): - raise TypeError(f"value should be a function, " f"got {type(value)}") + raise TypeError(f"value should be a function, got {type(value)}") class NamedtupleType(SingletonFieldType): @@ -302,7 +302,7 @@ def valid_types(self) -> Tuple[Type, ...]: def validate(self, value): if not (isinstance(value, self.valid_types) and hasattr(value, "_fields")): raise TypeError( - f"value should be instance of namedtuple, " f"got {type(value)}" + f"value should be instance of namedtuple, got {type(value)}" ) @@ -376,7 +376,7 @@ def validate(self, value): return if not isinstance(value, self.valid_types): raise TypeError( - f"value should be instance of {self.valid_types}, " f"got {type(value)}" + f"value should be instance of {self.valid_types}, got {type(value)}" ) if self.is_homogeneous(): field_type: AbstractFieldType = self._field_types[0] diff --git a/mars/services/lifecycle/supervisor/tracker.py b/mars/services/lifecycle/supervisor/tracker.py index f527b0aff3..a61f112fdd 100644 --- a/mars/services/lifecycle/supervisor/tracker.py +++ b/mars/services/lifecycle/supervisor/tracker.py @@ -163,9 +163,7 @@ def get_all_chunk_ref_counts(self) -> Dict[str, int]: def incref_tileables(self, tileable_keys: List[str]): for tileable_key in tileable_keys: if tileable_key not in self._tileable_key_to_chunk_keys: - raise TileableNotTracked( - f"tileable {tileable_key} " f"not tracked before" - ) + raise TileableNotTracked(f"tileable {tileable_key} not tracked before") self._tileable_ref_counts[tileable_key] += 1 incref_chunk_keys = self._tileable_key_to_chunk_keys[tileable_key] # incref chunks for this tileable @@ -180,9 +178,7 @@ async def decref_tileables(self, tileable_keys: List[str]): decref_chunk_keys = [] for tileable_key in tileable_keys: if tileable_key not in self._tileable_key_to_chunk_keys: - raise TileableNotTracked( - f"tileable {tileable_key} " f"not tracked before" - ) + raise TileableNotTracked(f"tileable {tileable_key} not tracked before") self._tileable_ref_counts[tileable_key] -= 1 decref_chunk_keys.extend(self._tileable_key_to_chunk_keys[tileable_key]) diff --git a/mars/services/session/api/oscar.py b/mars/services/session/api/oscar.py index af0681311e..848550fd9f 100644 --- a/mars/services/session/api/oscar.py +++ b/mars/services/session/api/oscar.py @@ -34,9 +34,7 @@ def __init__( @alru_cache(cache_exceptions=False) async def create(cls, address: str, **kwargs) -> "SessionAPI": if kwargs: # pragma: no cover - raise TypeError( - f"SessionAPI.create " f"got unknown arguments: {list(kwargs)}" - ) + raise TypeError(f"SessionAPI.create got unknown arguments: {list(kwargs)}") session_manager = await mo.actor_ref(address, SessionManagerActor.default_uid()) return SessionAPI(address, session_manager) @@ -201,9 +199,7 @@ class MockSessionAPI(SessionAPI): async def create(cls, address: str, **kwargs) -> "SessionAPI": session_id = kwargs.pop("session_id") if kwargs: # pragma: no cover - raise TypeError( - f"SessionAPI.create " f"got unknown arguments: {list(kwargs)}" - ) + raise TypeError(f"SessionAPI.create got unknown arguments: {list(kwargs)}") session_manager = await mo.create_actor( SessionManagerActor, address=address, uid=SessionManagerActor.default_uid() diff --git a/mars/services/storage/core.py b/mars/services/storage/core.py index 98776dde81..7e8e33ee70 100644 --- a/mars/services/storage/core.py +++ b/mars/services/storage/core.py @@ -117,7 +117,7 @@ def update_quota(self, size: int): def request_quota(self, size: int) -> bool: if self._total_size is not None and size > self._total_size: # pragma: no cover raise StorageFull( - f"Request size {size} is larger " f"than total size {self._total_size}" + f"Request size {size} is larger than total size {self._total_size}" ) if self._total_size is not None and self._used_size + size > self._total_size: logger.debug( @@ -131,7 +131,7 @@ def request_quota(self, size: int) -> bool: else: self._used_size += size logger.debug( - "Request %s bytes of %s, used size now is %s," "total size is %s", + "Request %s bytes of %s, used size now is %s, total size is %s", size, self._level, self._used_size, @@ -142,7 +142,7 @@ def request_quota(self, size: int) -> bool: def release_quota(self, size: int): self._used_size -= size logger.debug( - "Release %s bytes of %s, used size now is %s," "total size is %s", + "Release %s bytes of %s, used size now is %s, total size is %s", size, self._level, self._used_size, @@ -445,7 +445,7 @@ async def _setup_storage_backends(self): for client, storage_band in zip(clients, storage_bands): if client.level & level: logger.debug( - "Create quota manager for %s," " total size is %s", + "Create quota manager for %s, total size is %s", level, client.size, ) diff --git a/mars/services/storage/spill.py b/mars/services/storage/spill.py index 49e06d695e..52586dee73 100644 --- a/mars/services/storage/spill.py +++ b/mars/services/storage/spill.py @@ -171,7 +171,7 @@ async def spill( multiplier=1.1, ): logger.debug( - "%s is full, need to spill %s bytes, " "multiplier is %s", + "%s is full, need to spill %s bytes, multiplier is %s", level, request_size, multiplier, @@ -183,7 +183,7 @@ async def spill( level, band_name, request_size ) logger.debug( - "Decide to spill %s bytes, " "data keys are %s", sum(spill_sizes), spill_keys + "Decide to spill %s bytes, data keys are %s", sum(spill_sizes), spill_keys ) for (session_id, key), size in zip(spill_keys, spill_sizes): diff --git a/mars/services/subtask/worker/processor.py b/mars/services/subtask/worker/processor.py index 3ee90a22b8..587bbf2e56 100644 --- a/mars/services/subtask/worker/processor.py +++ b/mars/services/subtask/worker/processor.py @@ -136,7 +136,7 @@ async def _load_input_data(self): accept_nones.append(False) if keys: logger.debug( - "Start getting input data, keys: %s, " "subtask id: %s", + "Start getting input data, keys: %s, subtask id: %s", keys, self.subtask.subtask_id, ) @@ -149,7 +149,7 @@ async def _load_input_data(self): } ) logger.debug( - "Finish getting input data keys: %s, " "subtask id: %s", + "Finish getting input data keys: %s, subtask id: %s", keys, self.subtask.subtask_id, ) @@ -200,7 +200,7 @@ async def _execute_graph(self, chunk_graph: ChunkGraph): # since `op.execute` may be a time-consuming operation, # we make it run in a thread pool to not block current thread. logger.debug( - "Start executing operand: %s," "chunk: %s, subtask id: %s", + "Start executing operand: %s, chunk: %s, subtask id: %s", chunk.op, chunk, self.subtask.subtask_id, @@ -222,7 +222,7 @@ def cb(fut): try: await to_wait logger.debug( - "Finish executing operand: %s," "chunk: %s, subtask id: %s", + "Finish executing operand: %s, chunk: %s, subtask id: %s", chunk.op, chunk, self.subtask.subtask_id, @@ -239,7 +239,7 @@ def cb(fut): await future # if cancelled, stop next computation logger.debug( - "Cancelled operand: %s, chunk: %s, " "subtask id: %s", + "Cancelled operand: %s, chunk: %s, subtask id: %s", chunk.op, chunk, self.subtask.subtask_id, @@ -400,7 +400,7 @@ async def _store_meta( ) ) logger.debug( - "Start storing chunk metas for data keys: %s, " "subtask id: %s", + "Start storing chunk metas for data keys: %s, subtask id: %s", set_meta_keys, self.subtask.subtask_id, ) @@ -410,7 +410,7 @@ async def _store_meta( async def set_chunks_meta(): await self._meta_api.set_chunk_meta.batch(*set_chunk_metas) logger.debug( - "Finish store chunk metas for data keys: %s, " "subtask id: %s", + "Finish store chunk metas for data keys: %s, subtask id: %s", set_meta_keys, self.subtask.subtask_id, ) diff --git a/mars/services/web/tests/test_core.py b/mars/services/web/tests/test_core.py index 220f4cff1e..4803f89200 100644 --- a/mars/services/web/tests/test_core.py +++ b/mars/services/web/tests/test_core.py @@ -119,12 +119,12 @@ def url_recorder(request): assert res.body.decode() == "get_sub_value_test_id_sub_tid" res = await client.fetch( - f"http://localhost:{web_port}/api/test/test_id/" f"subtest/sub_tid?action=a1" + f"http://localhost:{web_port}/api/test/test_id/subtest/sub_tid?action=a1" ) assert res.body.decode() == "get_sub_value_test_id_sub_tid_action1" res = await client.fetch( - f"http://localhost:{web_port}/api/test/test_id/" f"subtest/sub_tid?action=a2" + f"http://localhost:{web_port}/api/test/test_id/subtest/sub_tid?action=a2" ) assert res.body.decode() == "get_sub_value_test_id_sub_tid_action2" diff --git a/mars/storage/cuda.py b/mars/storage/cuda.py index 958dbc3526..567777081f 100644 --- a/mars/storage/cuda.py +++ b/mars/storage/cuda.py @@ -70,8 +70,8 @@ def _initialize_read(self): self._offset = 0 self._has_read_headers = False self._buffers = [] - headers, buffers = _id_to_buffers[self._object_id] - self._headers = headers = headers.copy() + (metas, serialized), buffers = _id_to_buffers[self._object_id] + self._headers = headers = (metas.copy(), serialized) buffer_types = [] for buf in buffers: if isinstance(buf, cupy.ndarray): @@ -90,7 +90,7 @@ def _initialize_read(self): size = getattr(buf, "size", len(buf)) self._buffers.append(buf) buffer_types.append(["memory", size]) - headers["buffer_types"] = buffer_types + headers[0]["buffer_types"] = buffer_types def _initialize_write(self): self._had_write_headers = False @@ -143,7 +143,7 @@ def write(self, content): if not self._has_write_headers: self._headers = headers = pickle.loads(content) - buffer_types = headers["buffer_types"] + buffer_types = headers[0]["buffer_types"] for buffer_type, size in buffer_types: if buffer_type == "cuda": self._buffers.append(Buffer.empty(size)) @@ -153,7 +153,7 @@ def write(self, content): return cur_buf = self._buffers[self._cur_buffer_index] - cur_buf_size = self._headers["buffer_types"][self._cur_buffer_index][1] + cur_buf_size = self._headers[0]["buffer_types"][self._cur_buffer_index][1] if isinstance(cur_buf, Buffer): cur_cupy_memory = UnownedMemory(cur_buf.ptr, len(cur_buf), cur_buf) cupy_pointer = MemoryPointer(cur_cupy_memory, self._offset) @@ -188,7 +188,7 @@ def _read_close(self): def _write_close(self): headers = self._headers - headers.pop("buffer_types") + headers[0].pop("buffer_types") # hold cuda buffers _id_to_buffers[self._object_id] = headers, self._buffers diff --git a/mars/tensor/base/broadcast_to.py b/mars/tensor/base/broadcast_to.py index 9a06b27c15..7a54bae9c1 100644 --- a/mars/tensor/base/broadcast_to.py +++ b/mars/tensor/base/broadcast_to.py @@ -129,7 +129,7 @@ def broadcast_to(tensor, shape): if any(np.isnan(s) for s in tensor.shape): raise ValueError( - "input tensor has unknown shape, " "need to call `.execute()` first" + "input tensor has unknown shape, need to call `.execute()` first" ) if tensor.shape == shape: diff --git a/mars/tensor/base/delete.py b/mars/tensor/base/delete.py index 6575bbd567..533295135c 100644 --- a/mars/tensor/base/delete.py +++ b/mars/tensor/base/delete.py @@ -219,7 +219,7 @@ def delete(arr, obj, axis=None): arr = astensor(arr) if getattr(obj, "ndim", 0) > 1: # pragma: no cover raise ValueError( - "index array argument obj to insert must be " "one dimensional or scalar" + "index array argument obj to insert must be one dimensional or scalar" ) if axis is None: diff --git a/mars/tensor/base/insert.py b/mars/tensor/base/insert.py index e58acb0265..7fcdb8f8f9 100644 --- a/mars/tensor/base/insert.py +++ b/mars/tensor/base/insert.py @@ -360,7 +360,7 @@ def insert(arr, obj, values, axis=None): arr = astensor(arr) if getattr(obj, "ndim", 0) > 1: # pragma: no cover raise ValueError( - "index array argument obj to insert must be " "one dimensional or scalar" + "index array argument obj to insert must be one dimensional or scalar" ) if axis is None: diff --git a/mars/tensor/base/partition.py b/mars/tensor/base/partition.py index 821c33c639..e06474c3a4 100644 --- a/mars/tensor/base/partition.py +++ b/mars/tensor/base/partition.py @@ -672,7 +672,7 @@ def _validate_partition_arguments(a, kth, axis, kind, order, kw): need_align = kw.pop("need_align", None) if len(kw) > 0: raise TypeError( - "partition() got an unexpected keyword " f"argument '{next(iter(kw))}'" + f"partition() got an unexpected keyword argument '{next(iter(kw))}'" ) return a, kth, axis, kind, order, need_align diff --git a/mars/tensor/base/sort.py b/mars/tensor/base/sort.py index 12302e5194..e89077f4b5 100644 --- a/mars/tensor/base/sort.py +++ b/mars/tensor/base/sort.py @@ -311,12 +311,12 @@ def _validate_sort_psrs_kinds(psrs_kinds): continue else: raise ValueError( - "3rd element of psrs_kinds " "should be specified" + "3rd element of psrs_kinds should be specified" ) upper_psrs_kind = psrs_kind.upper() if upper_psrs_kind not in _AVAILABLE_KINDS: raise ValueError( - f"{psrs_kind} is an unrecognized kind " "in psrs_kinds" + f"{psrs_kind} is an unrecognized kind in psrs_kinds" ) else: raise TypeError("psrs_kinds should be list or tuple") @@ -343,7 +343,7 @@ def _validate_sort_arguments(a, axis, kind, parallel_kind, psrs_kinds, order): parallel_kind = parallel_kind.upper() if parallel_kind not in {"PSRS"}: raise ValueError( - f"{raw_parallel_kind} is an unrecognized kind of " "parallel sort" + f"{raw_parallel_kind} is an unrecognized kind of parallel sort" ) order = validate_order(a.dtype, order) @@ -489,9 +489,7 @@ def sort( """ need_align = kw.pop("need_align", None) if len(kw) > 0: - raise TypeError( - "sort() got an unexpected keyword " f"argument '{next(iter(kw))}'" - ) + raise TypeError(f"sort() got an unexpected keyword argument '{next(iter(kw))}'") a, axis, kind, parallel_kind, psrs_kinds, order = _validate_sort_arguments( a, axis, kind, parallel_kind, psrs_kinds, order ) diff --git a/mars/tensor/datasource/from_tiledb.py b/mars/tensor/datasource/from_tiledb.py index 7855c95cc0..c464b33e66 100644 --- a/mars/tensor/datasource/from_tiledb.py +++ b/mars/tensor/datasource/from_tiledb.py @@ -172,14 +172,14 @@ def fromtiledb(uri, ctx=None, key=None, timestamp=None, gpu=None): if tiledb_arr.nattr > 1: raise NotImplementedError( - "Does not supported TileDB array schema " "with more than 1 attr" + "Does not supported TileDB array schema with more than 1 attr" ) tiledb_dim_starts = tuple( tiledb_arr.domain.dim(j).domain[0].item() for j in range(tiledb_arr.ndim) ) if any(isinstance(s, float) for s in tiledb_dim_starts): raise ValueError( - "Does not support TileDB array schema " "whose dimensions has float domain" + "Does not support TileDB array schema whose dimensions has float domain" ) dtype = tiledb_arr.attr(0).dtype diff --git a/mars/tensor/einsum/einsumfunc.py b/mars/tensor/einsum/einsumfunc.py index c983372604..c59baedbaa 100644 --- a/mars/tensor/einsum/einsumfunc.py +++ b/mars/tensor/einsum/einsumfunc.py @@ -719,7 +719,7 @@ def parse_einsum_input(operands): # Make sure number operands is equivalent to the number of terms if len(input_subscripts.split(",")) != len(operands): raise ValueError( - "Number of einsum subscripts must be equal to the " "number of operands." + "Number of einsum subscripts must be equal to the number of operands." ) return (input_subscripts, output_subscript, operands) diff --git a/mars/tensor/indexing/compress.py b/mars/tensor/indexing/compress.py index 7ce2d4ebaa..16b8728a17 100644 --- a/mars/tensor/indexing/compress.py +++ b/mars/tensor/indexing/compress.py @@ -98,7 +98,7 @@ def compress(condition, a, axis=None, out=None): axis = validate_axis(a.ndim, axis) except ValueError: raise np.AxisError( - f"axis {axis} is out of bounds " f"for tensor of dimension {a.ndim}" + f"axis {axis} is out of bounds for tensor of dimension {a.ndim}" ) try: @@ -120,5 +120,5 @@ def compress(condition, a, axis=None, out=None): return out except IndexError: raise np.AxisError( - f"axis {len(condition)} is out of bounds " "for tensor of dimension 1" + f"axis {len(condition)} is out of bounds for tensor of dimension 1" ) diff --git a/mars/tensor/indexing/take.py b/mars/tensor/indexing/take.py index c075727ec9..21ef40fc04 100644 --- a/mars/tensor/indexing/take.py +++ b/mars/tensor/indexing/take.py @@ -123,7 +123,7 @@ def take(a, indices, axis=None, out=None): if out.shape != t.shape: raise ValueError( - "output tensor has wrong shape, " f"expect: {t.shape}, got: {out.shape}" + f"output tensor has wrong shape, expect: {t.shape}, got: {out.shape}" ) check_out_param(out, t, "unsafe") out.data = t.data diff --git a/mars/tensor/lib/index_tricks.py b/mars/tensor/lib/index_tricks.py index 623885f06a..bbfa964c48 100644 --- a/mars/tensor/lib/index_tricks.py +++ b/mars/tensor/lib/index_tricks.py @@ -211,7 +211,7 @@ def __getitem__(self, key): newobj = newobj.swapaxes(-1, trans1d) elif isinstance(item, str): if k != 0: - raise ValueError("special directives must be the " "first entry.") + raise ValueError("special directives must be the first entry.") if item in ("r", "c"): # pragma: no cover raise NotImplementedError("Does not support operation on matrix") if "," in item: diff --git a/mars/tensor/linalg/cholesky.py b/mars/tensor/linalg/cholesky.py index 949b8ea87a..c62520d5b0 100644 --- a/mars/tensor/linalg/cholesky.py +++ b/mars/tensor/linalg/cholesky.py @@ -316,7 +316,7 @@ def cholesky(a, lower=False): if a.ndim != 2: # pragma: no cover raise LinAlgError( - f"{a.ndim}-dimensional array given. " "Tensor must be two-dimensional" + f"{a.ndim}-dimensional array given. Tensor must be two-dimensional" ) if a.shape[0] != a.shape[1]: # pragma: no cover raise LinAlgError("Input must be square") diff --git a/mars/tensor/linalg/inv.py b/mars/tensor/linalg/inv.py index a03a2b1de3..089266d333 100644 --- a/mars/tensor/linalg/inv.py +++ b/mars/tensor/linalg/inv.py @@ -145,7 +145,7 @@ def inv(a, sparse=None): a = astensor(a) if a.ndim != 2: raise LinAlgError( - f"{a.ndim}-dimensional array given. " "Tensor must be two-dimensional" + f"{a.ndim}-dimensional array given. Tensor must be two-dimensional" ) if a.shape[0] != a.shape[1]: raise LinAlgError("Input must be square") diff --git a/mars/tensor/linalg/lu.py b/mars/tensor/linalg/lu.py index edf326e057..36a388f2d6 100644 --- a/mars/tensor/linalg/lu.py +++ b/mars/tensor/linalg/lu.py @@ -41,7 +41,7 @@ def __call__(self, a): a = astensor(a) if a.ndim != 2: raise LinAlgError( - f"{a.ndim}-dimensional array given. " "Tensor must be two-dimensional" + f"{a.ndim}-dimensional array given. Tensor must be two-dimensional" ) if a.shape[0] > a.shape[1]: diff --git a/mars/tensor/linalg/qr.py b/mars/tensor/linalg/qr.py index d7970b6796..57ecec7268 100644 --- a/mars/tensor/linalg/qr.py +++ b/mars/tensor/linalg/qr.py @@ -53,7 +53,7 @@ def __call__(self, a): if a.ndim != 2: raise LinAlgError( - f"{a.ndim}-dimensional tensor given. " "Tensor must be two-dimensional" + f"{a.ndim}-dimensional tensor given. Tensor must be two-dimensional" ) tiny_q, tiny_r = np.linalg.qr(np.ones((1, 1), dtype=a.dtype)) diff --git a/mars/tensor/linalg/svd.py b/mars/tensor/linalg/svd.py index fa72090faa..670540874e 100644 --- a/mars/tensor/linalg/svd.py +++ b/mars/tensor/linalg/svd.py @@ -58,7 +58,7 @@ def __call__(self, a): if a.ndim != 2: raise LinAlgError( - f"{a.ndim}-dimensional tensor given. " "Tensor must be two-dimensional" + f"{a.ndim}-dimensional tensor given. Tensor must be two-dimensional" ) tiny_U, tiny_s, tiny_V = np.linalg.svd(np.ones((1, 1), dtype=a.dtype)) diff --git a/mars/tensor/merge/stack.py b/mars/tensor/merge/stack.py index bbacfa6ea0..3d83e90146 100644 --- a/mars/tensor/merge/stack.py +++ b/mars/tensor/merge/stack.py @@ -209,7 +209,7 @@ def stack(tensors, axis=0, out=None): axis = ndim + axis + 1 if axis > ndim or axis < 0: raise np.AxisError( - f"axis {raw_axis} is out of bounds for tensor " f"of dimension {ndim}" + f"axis {raw_axis} is out of bounds for tensor of dimension {ndim}" ) dtype = np.result_type(*[t.dtype for t in tensors]) diff --git a/mars/tensor/reduction/core.py b/mars/tensor/reduction/core.py index 977eb50e39..a3f6824a26 100644 --- a/mars/tensor/reduction/core.py +++ b/mars/tensor/reduction/core.py @@ -373,7 +373,7 @@ def _get_arg_axis(axis, ndim): axis = (axis,) ravel = ndim == 1 else: - raise TypeError("axis must be either `None` or int, " f"got '{axis}'") + raise TypeError(f"axis must be either `None` or int, got '{axis}'") return axis, ravel @staticmethod diff --git a/mars/tensor/spatial/distance/cdist.py b/mars/tensor/spatial/distance/cdist.py index 965c440962..fe40cd0c8a 100644 --- a/mars/tensor/spatial/distance/cdist.py +++ b/mars/tensor/spatial/distance/cdist.py @@ -529,7 +529,7 @@ def cdist(XA, XB, metric="euclidean", **kwargs): if not isinstance(metric, str) and not callable(metric): raise TypeError( - "3rd argument metric must be a string identifier " "or a function." + "3rd argument metric must be a string identifier or a function." ) # scipy remove "wminkowski" since v1.8.0, use "minkowski" with `w=` diff --git a/mars/tensor/spatial/distance/pdist.py b/mars/tensor/spatial/distance/pdist.py index 45f8176975..c354330bfd 100644 --- a/mars/tensor/spatial/distance/pdist.py +++ b/mars/tensor/spatial/distance/pdist.py @@ -694,7 +694,7 @@ def pdist(X, metric="euclidean", **kwargs): if not callable(metric) and not isinstance(metric, str): raise TypeError( - "2nd argument metric must be a string identifier " "or a function." + "2nd argument metric must be a string identifier or a function." ) # scipy remove "wminkowski" since v1.8.0, use "minkowski" with `w=` diff --git a/mars/tensor/spatial/distance/squareform.py b/mars/tensor/spatial/distance/squareform.py index 9fdd2bd5cb..ee1575b5c3 100644 --- a/mars/tensor/spatial/distance/squareform.py +++ b/mars/tensor/spatial/distance/squareform.py @@ -95,12 +95,12 @@ def __call__(self, X, force="no", chunk_size=None): if force.lower() == "tomatrix": if len(s) != 1: raise ValueError( - "Forcing 'tomatrix' but input X is not a " "distance vector." + "Forcing 'tomatrix' but input X is not a distance vector." ) elif force.lower() == "tovector": if len(s) != 2: raise ValueError( - "Forcing 'tovector' but input X is not a " "distance matrix." + "Forcing 'tovector' but input X is not a distance matrix." ) # X = squareform(v) diff --git a/mars/tensor/statistics/average.py b/mars/tensor/statistics/average.py index 8022ebbfd5..8ceb0b74d7 100644 --- a/mars/tensor/statistics/average.py +++ b/mars/tensor/statistics/average.py @@ -118,7 +118,7 @@ def average(a, axis=None, weights=None, returned=False): if a.shape != wgt.shape: if axis is None: raise TypeError( - "Axis must be specified when shapes of a and weights " "differ." + "Axis must be specified when shapes of a and weights differ." ) if wgt.ndim != 1: raise TypeError( diff --git a/mars/tensor/statistics/quantile.py b/mars/tensor/statistics/quantile.py index f57c28196d..6a1f0d6bcf 100644 --- a/mars/tensor/statistics/quantile.py +++ b/mars/tensor/statistics/quantile.py @@ -545,7 +545,7 @@ def quantile( handle_non_numeric = kw.pop("handle_non_numeric", None) if len(kw) > 0: # pragma: no cover raise TypeError( - "quantile() got an unexpected keyword " f"argument '{next(iter(kw))}'" + f"quantile() got an unexpected keyword argument '{next(iter(kw))}'" ) if not isinstance(q, ENTITY_TYPE): diff --git a/mars/tensor/stats/ttest.py b/mars/tensor/stats/ttest.py index f2adcc8aba..087efe0a58 100644 --- a/mars/tensor/stats/ttest.py +++ b/mars/tensor/stats/ttest.py @@ -84,7 +84,7 @@ def _ttest_finish(df, t, alternative): elif alternative == "two-sided": prob = mt_abs(t).map_chunk(sp_distributions.t.sf, args=(df,)) * 2 else: - raise ValueError("alternative must be " "'less', 'greater' or 'two-sided'") + raise ValueError("alternative must be 'less', 'greater' or 'two-sided'") if t.ndim == 0: t = t[()] return t, prob diff --git a/mars/tensor/utils.py b/mars/tensor/utils.py index 5d2b9932bc..4c65b309f5 100644 --- a/mars/tensor/utils.py +++ b/mars/tensor/utils.py @@ -669,9 +669,7 @@ def check_random_state(seed): return mtrand.RandomState.from_numpy(seed) if isinstance(seed, mtrand.RandomState): return seed - raise ValueError( - f"{seed} cannot be used to seed a mt.random.RandomState" " instance" - ) + raise ValueError(f"{seed} cannot be used to seed a mt.random.RandomState instance") def filter_inputs(inputs):