Skip to content

Commit

Permalink
FEAT-modin-project#1821: More efficient, batching delivering
Browse files Browse the repository at this point in the history
Signed-off-by: Vasilij Litvinov <vasilij.n.litvinov@intel.com>
  • Loading branch information
vnlitvinov committed Jul 28, 2020
1 parent ffd3113 commit 864b267
Showing 1 changed file with 36 additions and 12 deletions.
48 changes: 36 additions & 12 deletions modin/experimental/cloud/rpyc_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,48 @@
from . import get_connection
from .meta_magic import _LOCAL_ATTRS, RemoteMeta, _KNOWN_DUALS

def _batch_loads(items):
return tuple(pickle.loads(item) for item in items)

class WrappingConnection(rpyc.Connection):
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self._remote_pickle_loads = None
self._remote_batch_loads = None
self._remote_cls_cache = {}
def __wrap(self, local_obj):
while True:
# unwrap magic wrappers first
try:
local_obj = object.__getattribute__(local_obj, '__remote_end__')
except AttributeError:
break
# do not pickle netrefs
if isinstance(local_obj, netref.BaseNetref) and local_obj.____conn__ is self:
return None
return bytes(pickle.dumps(local_obj))

def deliver(self, args, kw):


def deliver(self, local_obj):
"""
More caching version of rpyc.classic.deliver()
More efficient, batched version of rpyc.classic.deliver()
"""
try:
local_obj = object.__getattribute__(local_obj, "__remote_end__")
except AttributeError:
pass
if isinstance(local_obj, netref.BaseNetref) and local_obj.____conn__ is self:
return local_obj
return self._remote_pickle_loads(bytes(pickle.dumps(local_obj)))
pickled_args = [self.__wrap(arg) for arg in args]
pickled_kw = [(k, self.__wrap(v)) for (k, v) in kw.items()]

pickled = [i for i in pickled_args if i is not None] + [v for (k, v) in pickled_kw if v is not None]
remote = iter(self._remote_batch_loads(tuple(pickled)))

delivered_args = []
for local_arg, pickled_arg in zip(args, pickled_args):
delivered_args.append(next(remote) if pickled_arg is not None else local_arg)
delivered_kw = {}
for k, pickled_v in pickled_kw:
delivered_kw[k] = next(remote) if pickled_v is not None else kw[k]

return tuple(delivered_args), delivered_kw


def _netref_factory(self, id_pack):
id_name, cls_id, inst_id = id_pack
Expand Down Expand Up @@ -92,6 +116,7 @@ def _box(self, obj):

def _init_deliver(self):
self._remote_pickle_loads = self.modules["rpyc.lib.compat"].pickle.loads
self._remote_batch_loads = self.modules["modin.experimental.cloud.rpyc_proxy"]._batch_loads


class WrappingService(rpyc.ClassicService):
Expand Down Expand Up @@ -374,9 +399,8 @@ class DeliveringMixin:
for method in methods:

def wrapper(self, *args, __remote_conn__=conn, __method_name__=method, **kw):
args = tuple(__remote_conn__.deliver(x) for x in args)
kw = {k: __remote_conn__.deliver(v) for k, v in kw.items()}
cache = object.__getattribute__(self, "__remote_methods__")
args, kw = __remote_conn__.deliver(args, kw)
cache = object.__getattribute__(self, '__remote_methods__')
try:
remote = cache[__method_name__]
except KeyError:
Expand Down

0 comments on commit 864b267

Please sign in to comment.