From 5a2c2b5d2026b872eecd4ec75093c3b7e066884e Mon Sep 17 00:00:00 2001 From: Vasilij Litvinov Date: Thu, 23 Jul 2020 14:55:34 +0300 Subject: [PATCH] FEAT-#1821: Format stuff by black Signed-off-by: Vasilij Litvinov --- modin/experimental/cloud/rpyc_proxy.py | 57 ++++++++++++++++---------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/modin/experimental/cloud/rpyc_proxy.py b/modin/experimental/cloud/rpyc_proxy.py index ff7eadd1877..ca9e986f1ee 100644 --- a/modin/experimental/cloud/rpyc_proxy.py +++ b/modin/experimental/cloud/rpyc_proxy.py @@ -23,13 +23,16 @@ from . import get_connection from .meta_magic import _LOCAL_ATTRS, RemoteMeta, _KNOWN_DUALS + def _batch_loads(items): return tuple(pickle.loads(item) for item in items) + def _tuplize(arg): - '''turns any sequence or iterator into a flat tuple''' + """turns any sequence or iterator into a flat tuple""" return tuple(arg) + class WrappingConnection(rpyc.Connection): def __init__(self, *a, **kw): super().__init__(*a, **kw) @@ -39,13 +42,11 @@ def __init__(self, *a, **kw): self._remote_dumps = None self._remote_tuplize = None - - def __wrap(self, local_obj): while True: # unwrap magic wrappers first try: - local_obj = object.__getattribute__(local_obj, '__remote_end__') + local_obj = object.__getattribute__(local_obj, "__remote_end__") except AttributeError: break # do not pickle netrefs @@ -55,29 +56,32 @@ def __wrap(self, local_obj): def deliver(self, args, kw): - """ More efficient, batched version of rpyc.classic.deliver() """ pickled_args = [self.__wrap(arg) for arg in args] pickled_kw = [(k, self.__wrap(v)) for (k, v) in kw.items()] - pickled = [i for i in pickled_args if i is not None] + [v for (k, v) in pickled_kw if v is not None] + pickled = [i for i in pickled_args if i is not None] + [ + v for (k, v) in pickled_kw if v is not None + ] remote = iter(self._remote_batch_loads(tuple(pickled))) delivered_args = [] for local_arg, pickled_arg in zip(args, pickled_args): - delivered_args.append(next(remote) if pickled_arg is not None else local_arg) + delivered_args.append( + next(remote) if pickled_arg is not None else local_arg + ) delivered_kw = {} for k, pickled_v in pickled_kw: delivered_kw[k] = next(remote) if pickled_v is not None else kw[k] - + return tuple(delivered_args), delivered_kw def obtain(self, remote): while True: try: - remote = object.__getattribute__(remote, '__remote_end__') + remote = object.__getattribute__(remote, "__remote_end__") except AttributeError: break return pickle.loads(self._remote_dumps(remote)) @@ -85,7 +89,7 @@ def obtain(self, remote): def obtain_tuple(self, remote): while True: try: - remote = object.__getattribute__(remote, '__remote_end__') + remote = object.__getattribute__(remote, "__remote_end__") except AttributeError: break return self._remote_tuplize(remote) @@ -93,19 +97,19 @@ def obtain_tuple(self, remote): def sync_request(self, handler, *args): if handler == consts.HANDLE_INSPECT: id_name = str(args[0][0]) - if id_name.split('.', 1)[0] in ('modin', 'pandas', 'numpy'): + if id_name.split(".", 1)[0] in ("modin", "pandas", "numpy"): try: modobj = __import__(id_name) - for subname in id_name.split('.')[1:]: + for subname in id_name.split(".")[1:]: modobj = getattr(modobj, subname) except (ImportError, AttributeError): pass else: return get_methods(netref.LOCAL_ATTRS, modobj) - modname, clsname = id_name.rsplit('.', 1) + modname, clsname = id_name.rsplit(".", 1) try: modobj = __import__(modname) - for subname in modname.split('.')[1:]: + for subname in modname.split(".")[1:]: modobj = getattr(modobj, subname) clsobj = getattr(modobj, clsname) except (ImportError, AttributeError): @@ -120,7 +124,7 @@ def sync_request(self, handler, *args): obj = args[0] key = handler - if str(obj.____id_pack__[0]) in {'numpy', 'numpy.dtype'}: + if str(obj.____id_pack__[0]) in {"numpy", "numpy.dtype"}: cache = self._static_cache[obj.____id_pack__] try: result = cache[key] @@ -128,7 +132,7 @@ def sync_request(self, handler, *args): result = cache[key] = super().sync_request(handler, *args) if handler == consts.HANDLE_GETATTR: # save an entry in our cache telling that we get this attribute cached - self._static_cache[result.____id_pack__]['__getattr__'] = True + self._static_cache[result.____id_pack__]["__getattr__"] = True return result return super().sync_request(handler, *args) @@ -136,12 +140,12 @@ def sync_request(self, handler, *args): def async_request(self, handler, *args, **kw): if handler == consts.HANDLE_DEL: obj, _ = args - if str(obj.____id_pack__[0]) in {'numpy', 'numpy.dtype'}: + if str(obj.____id_pack__[0]) in {"numpy", "numpy.dtype"}: if obj.____id_pack__ in self._static_cache: # object is cached by us, so ignore the request or remote end dies and cache is suddenly stale; # we shouldn't remove item from cache as it would reduce performance res = AsyncResult(self) - res._is_ready = True # simulate finished async request + res._is_ready = True # simulate finished async request return res return super().async_request(handler, *args, **kw) @@ -197,9 +201,13 @@ def _box(self, obj): return super()._box(obj) def _init_deliver(self): - self._remote_batch_loads = self.modules["modin.experimental.cloud.rpyc_proxy"]._batch_loads + self._remote_batch_loads = self.modules[ + "modin.experimental.cloud.rpyc_proxy" + ]._batch_loads self._remote_dumps = self.modules["rpyc.lib.compat"].pickle.dumps - self._remote_tuplize = self.modules["modin.experimental.cloud.rpyc_proxy"]._tuplize + self._remote_tuplize = self.modules[ + "modin.experimental.cloud.rpyc_proxy" + ]._tuplize class WrappingService(rpyc.ClassicService): @@ -331,7 +339,9 @@ def method(_self, *_args, __method_name__=name, **_kw): except KeyError: # use remote_cls.__getattr__ to force RPyC return us # a proxy for remote method call instead of its local wrapper - _self.__remote_methods__[__method_name__] = remote = remote_cls.__getattr__(__method_name__) + _self.__remote_methods__[ + __method_name__ + ] = remote = remote_cls.__getattr__(__method_name__) return remote(_self.__remote_end__, *_args, **_kw) method.__name__ = name @@ -480,7 +490,7 @@ class DeliveringMixin: def wrapper(self, *args, __remote_conn__=conn, __method_name__=method, **kw): args, kw = __remote_conn__.deliver(args, kw) - cache = object.__getattribute__(self, '__remote_methods__') + cache = object.__getattribute__(self, "__remote_methods__") try: remote = cache[__method_name__] except KeyError: @@ -530,11 +540,14 @@ def make_dataframe_wrapper(DataFrame): from modin.pandas.series import Series conn = get_connection() + class ObtainingItems: def items(self): return conn.obtain_tuple(self.__remote_end__.items()) + def iteritems(self): return conn.obtain_tuple(self.__remote_end__.iteritems()) + ObtainingItems = _deliveringWrapper(Series, mixin=ObtainingItems) class DataFrameOverrides(_prepare_loc_mixin()):