Skip to content

Commit

Permalink
FEAT-modin-project#1821: Format stuff by black
Browse files Browse the repository at this point in the history
Signed-off-by: Vasilij Litvinov <vasilij.n.litvinov@intel.com>
  • Loading branch information
vnlitvinov committed Jul 28, 2020
1 parent fdeb26e commit 5a2c2b5
Showing 1 changed file with 35 additions and 22 deletions.
57 changes: 35 additions & 22 deletions modin/experimental/cloud/rpyc_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
from . import get_connection
from .meta_magic import _LOCAL_ATTRS, RemoteMeta, _KNOWN_DUALS


def _batch_loads(items):
return tuple(pickle.loads(item) for item in items)


def _tuplize(arg):
'''turns any sequence or iterator into a flat tuple'''
"""turns any sequence or iterator into a flat tuple"""
return tuple(arg)


class WrappingConnection(rpyc.Connection):
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
Expand All @@ -39,13 +42,11 @@ def __init__(self, *a, **kw):
self._remote_dumps = None
self._remote_tuplize = None



def __wrap(self, local_obj):
while True:
# unwrap magic wrappers first
try:
local_obj = object.__getattribute__(local_obj, '__remote_end__')
local_obj = object.__getattribute__(local_obj, "__remote_end__")
except AttributeError:
break
# do not pickle netrefs
Expand All @@ -55,57 +56,60 @@ def __wrap(self, local_obj):

def deliver(self, args, kw):


"""
More efficient, batched version of rpyc.classic.deliver()
"""
pickled_args = [self.__wrap(arg) for arg in args]
pickled_kw = [(k, self.__wrap(v)) for (k, v) in kw.items()]

pickled = [i for i in pickled_args if i is not None] + [v for (k, v) in pickled_kw if v is not None]
pickled = [i for i in pickled_args if i is not None] + [
v for (k, v) in pickled_kw if v is not None
]
remote = iter(self._remote_batch_loads(tuple(pickled)))

delivered_args = []
for local_arg, pickled_arg in zip(args, pickled_args):
delivered_args.append(next(remote) if pickled_arg is not None else local_arg)
delivered_args.append(
next(remote) if pickled_arg is not None else local_arg
)
delivered_kw = {}
for k, pickled_v in pickled_kw:
delivered_kw[k] = next(remote) if pickled_v is not None else kw[k]

return tuple(delivered_args), delivered_kw

def obtain(self, remote):
while True:
try:
remote = object.__getattribute__(remote, '__remote_end__')
remote = object.__getattribute__(remote, "__remote_end__")
except AttributeError:
break
return pickle.loads(self._remote_dumps(remote))

def obtain_tuple(self, remote):
while True:
try:
remote = object.__getattribute__(remote, '__remote_end__')
remote = object.__getattribute__(remote, "__remote_end__")
except AttributeError:
break
return self._remote_tuplize(remote)

def sync_request(self, handler, *args):
if handler == consts.HANDLE_INSPECT:
id_name = str(args[0][0])
if id_name.split('.', 1)[0] in ('modin', 'pandas', 'numpy'):
if id_name.split(".", 1)[0] in ("modin", "pandas", "numpy"):
try:
modobj = __import__(id_name)
for subname in id_name.split('.')[1:]:
for subname in id_name.split(".")[1:]:
modobj = getattr(modobj, subname)
except (ImportError, AttributeError):
pass
else:
return get_methods(netref.LOCAL_ATTRS, modobj)
modname, clsname = id_name.rsplit('.', 1)
modname, clsname = id_name.rsplit(".", 1)
try:
modobj = __import__(modname)
for subname in modname.split('.')[1:]:
for subname in modname.split(".")[1:]:
modobj = getattr(modobj, subname)
clsobj = getattr(modobj, clsname)
except (ImportError, AttributeError):
Expand All @@ -120,28 +124,28 @@ def sync_request(self, handler, *args):
obj = args[0]
key = handler

if str(obj.____id_pack__[0]) in {'numpy', 'numpy.dtype'}:
if str(obj.____id_pack__[0]) in {"numpy", "numpy.dtype"}:
cache = self._static_cache[obj.____id_pack__]
try:
result = cache[key]
except KeyError:
result = cache[key] = super().sync_request(handler, *args)
if handler == consts.HANDLE_GETATTR:
# save an entry in our cache telling that we get this attribute cached
self._static_cache[result.____id_pack__]['__getattr__'] = True
self._static_cache[result.____id_pack__]["__getattr__"] = True
return result

return super().sync_request(handler, *args)

def async_request(self, handler, *args, **kw):
if handler == consts.HANDLE_DEL:
obj, _ = args
if str(obj.____id_pack__[0]) in {'numpy', 'numpy.dtype'}:
if str(obj.____id_pack__[0]) in {"numpy", "numpy.dtype"}:
if obj.____id_pack__ in self._static_cache:
# object is cached by us, so ignore the request or remote end dies and cache is suddenly stale;
# we shouldn't remove item from cache as it would reduce performance
res = AsyncResult(self)
res._is_ready = True # simulate finished async request
res._is_ready = True # simulate finished async request
return res
return super().async_request(handler, *args, **kw)

Expand Down Expand Up @@ -197,9 +201,13 @@ def _box(self, obj):
return super()._box(obj)

def _init_deliver(self):
self._remote_batch_loads = self.modules["modin.experimental.cloud.rpyc_proxy"]._batch_loads
self._remote_batch_loads = self.modules[
"modin.experimental.cloud.rpyc_proxy"
]._batch_loads
self._remote_dumps = self.modules["rpyc.lib.compat"].pickle.dumps
self._remote_tuplize = self.modules["modin.experimental.cloud.rpyc_proxy"]._tuplize
self._remote_tuplize = self.modules[
"modin.experimental.cloud.rpyc_proxy"
]._tuplize


class WrappingService(rpyc.ClassicService):
Expand Down Expand Up @@ -331,7 +339,9 @@ def method(_self, *_args, __method_name__=name, **_kw):
except KeyError:
# use remote_cls.__getattr__ to force RPyC return us
# a proxy for remote method call instead of its local wrapper
_self.__remote_methods__[__method_name__] = remote = remote_cls.__getattr__(__method_name__)
_self.__remote_methods__[
__method_name__
] = remote = remote_cls.__getattr__(__method_name__)
return remote(_self.__remote_end__, *_args, **_kw)

method.__name__ = name
Expand Down Expand Up @@ -480,7 +490,7 @@ class DeliveringMixin:

def wrapper(self, *args, __remote_conn__=conn, __method_name__=method, **kw):
args, kw = __remote_conn__.deliver(args, kw)
cache = object.__getattribute__(self, '__remote_methods__')
cache = object.__getattribute__(self, "__remote_methods__")
try:
remote = cache[__method_name__]
except KeyError:
Expand Down Expand Up @@ -530,11 +540,14 @@ def make_dataframe_wrapper(DataFrame):
from modin.pandas.series import Series

conn = get_connection()

class ObtainingItems:
def items(self):
return conn.obtain_tuple(self.__remote_end__.items())

def iteritems(self):
return conn.obtain_tuple(self.__remote_end__.iteritems())

ObtainingItems = _deliveringWrapper(Series, mixin=ObtainingItems)

class DataFrameOverrides(_prepare_loc_mixin()):
Expand Down

0 comments on commit 5a2c2b5

Please sign in to comment.