Skip to content

Commit

Permalink
FEAT-modin-project#1821: Add comments and docstrings
Browse files Browse the repository at this point in the history
Signed-off-by: Vasilij Litvinov <vasilij.n.litvinov@intel.com>
  • Loading branch information
vnlitvinov committed Jul 28, 2020
1 parent c1c86f5 commit 99650d7
Showing 1 changed file with 33 additions and 10 deletions.
43 changes: 33 additions & 10 deletions modin/experimental/cloud/rpyc_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ def __init__(self, *a, **kw):

def __wrap(self, local_obj):
while True:
# unwrap magic wrappers first
# unwrap magic wrappers first; keep unwrapping in case it's a wrapper-in-a-wrapper
# this shouldn't usually happen, so this is mostly a safety net
try:
local_obj = object.__getattribute__(local_obj, "__remote_end__")
except AttributeError:
break
# do not pickle netrefs
# do not pickle netrefs of our current connection, but do pickle those of other;
# example of this: an object made in _other_ remote context being pased to ours
if isinstance(local_obj, netref.BaseNetref) and local_obj.____conn__ is self:
return None
return bytes(pickle.dumps(local_obj))
Expand Down Expand Up @@ -95,7 +97,15 @@ def obtain_tuple(self, remote):
return self._remote_tuplize(remote)

def sync_request(self, handler, *args):
"""
Intercept outgoing synchronous requests from RPyC to add caching or
fulfilling them locally if possible to improve performance.
We should try to make as few remote calls as possible, because each
call adds up to latency.
"""
if handler == consts.HANDLE_INSPECT:
# always inspect classes from modin, pandas and numpy locally,
# do not go to network for those
id_name = str(args[0][0])
if id_name.split(".", 1)[0] in ("modin", "pandas", "numpy"):
try:
Expand Down Expand Up @@ -125,6 +135,10 @@ def sync_request(self, handler, *args):
key = handler

if str(obj.____id_pack__[0]) in {"numpy", "numpy.dtype"}:
# always assume numpy attributes and numpy.dtype attributes are always the same;
# note that we're using RPyC id_pack as cache key, and it includes the name,
# class id and instance id, so this cache is unique to each instance of, say,
# numpy.dtype(), hence numpy.int16 and numpy.float64 got different caches.
cache = self._static_cache[obj.____id_pack__]
try:
result = cache[key]
Expand All @@ -138,15 +152,21 @@ def sync_request(self, handler, *args):
return super().sync_request(handler, *args)

def async_request(self, handler, *args, **kw):
"""
Override async request handling to intercept outgoing deletion requests because we cache
certain things, and if we allow deletion of cached things our cache becomes stale.
We can clean the cache upon deletion, but it would increase cache misses a lot.
Also note that memory is not leaked forever, RPyC frees all of it upon disconnect.
"""
if handler == consts.HANDLE_DEL:
obj, _ = args
if str(obj.____id_pack__[0]) in {"numpy", "numpy.dtype"}:
if obj.____id_pack__ in self._static_cache:
# object is cached by us, so ignore the request or remote end dies and cache is suddenly stale;
# we shouldn't remove item from cache as it would reduce performance
res = AsyncResult(self)
res._is_ready = True # simulate finished async request
return res
if obj.____id_pack__ in self._static_cache:
# object is cached by us, so ignore the request or remote end dies and cache is suddenly stale;
# we shouldn't remove item from cache as it would reduce performance
res = AsyncResult(self)
res._is_ready = True # simulate finished async request
return res
return super().async_request(handler, *args, **kw)

def _netref_factory(self, id_pack):
Expand Down Expand Up @@ -494,7 +514,10 @@ def wrapper(self, *args, __remote_conn__=conn, __method_name__=method, **kw):
try:
remote = cache[__method_name__]
except KeyError:
cache[__method_name__] = remote = getattr(remote_cls, __method_name__)
# see comments in ProxyMeta.__prepare__ on using remote_cls.__getattr__
cache[__method_name__] = remote = remote_cls.__getattr__(
__method_name__
)
return remote(self.__remote_end__, *args, **kw)

wrapper.__name__ = method
Expand Down

0 comments on commit 99650d7

Please sign in to comment.