Skip to content

Commit

Permalink
FEAT-#1821: Speed up RPyC connection (#1833)
Browse files Browse the repository at this point in the history
Signed-off-by: Vasilij Litvinov <vasilij.n.litvinov@intel.com>
  • Loading branch information
vnlitvinov authored Jul 28, 2020
1 parent 02977df commit 0a03e7a
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 26 deletions.
3 changes: 1 addition & 2 deletions modin/data_management/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,7 @@ def __getattr__(self, name):
except KeyError:

def wrap(*a, _original=getattr(self.__io_cls, name), **kw):
a = tuple(self.__conn.deliver(x) for x in a)
kw = {k: self.__conn.deliver(v) for k, v in kw.items()}
a, kw = self.__conn.deliver(a, kw)
return _original(*a, **kw)

self.__wrappers[name] = wrap
Expand Down
9 changes: 5 additions & 4 deletions modin/experimental/cloud/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,12 @@ def __try_connect(self):
from .rpyc_proxy import WrappingService

try:
self.__connection = rpyc.connect(
"127.0.0.1",
self.rpyc_port,
stream = rpyc.SocketStream.connect(
host="127.0.0.1", port=self.rpyc_port, nodelay=True, keepalive=True
)
self.__connection = rpyc.connect_stream(
stream,
WrappingService,
keepalive=True,
config={"sync_request_timeout": RPYC_REQUEST_TIMEOUT},
)
except (ConnectionRefusedError, EOFError):
Expand Down
223 changes: 203 additions & 20 deletions modin/experimental/cloud/rpyc_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,177 @@
# governing permissions and limitations under the License.

import types
import collections

import rpyc
from rpyc.lib.compat import pickle
from rpyc.core import netref
from rpyc.lib import get_methods

from rpyc.core import netref, AsyncResult, consts

from . import get_connection
from .meta_magic import _LOCAL_ATTRS, RemoteMeta, _KNOWN_DUALS


def _batch_loads(items):
return tuple(pickle.loads(item) for item in items)


def _tuplize(arg):
"""turns any sequence or iterator into a flat tuple"""
return tuple(arg)


class WrappingConnection(rpyc.Connection):
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self._remote_pickle_loads = None
self._remote_batch_loads = None
self._remote_cls_cache = {}
self._static_cache = collections.defaultdict(dict)
self._remote_dumps = None
self._remote_tuplize = None

def __wrap(self, local_obj):
while True:
# unwrap magic wrappers first; keep unwrapping in case it's a wrapper-in-a-wrapper
# this shouldn't usually happen, so this is mostly a safety net
try:
local_obj = object.__getattribute__(local_obj, "__remote_end__")
except AttributeError:
break
# do not pickle netrefs of our current connection, but do pickle those of other;
# example of this: an object made in _other_ remote context being pased to ours
if isinstance(local_obj, netref.BaseNetref) and local_obj.____conn__ is self:
return None
return bytes(pickle.dumps(local_obj))

def deliver(self, args, kw):

def deliver(self, local_obj):
"""
More caching version of rpyc.classic.deliver()
More efficient, batched version of rpyc.classic.deliver()
"""
try:
local_obj = object.__getattribute__(local_obj, "__remote_end__")
except AttributeError:
pass
if isinstance(local_obj, netref.BaseNetref) and local_obj.____conn__ is self:
return local_obj
return self._remote_pickle_loads(bytes(pickle.dumps(local_obj)))
pickled_args = [self.__wrap(arg) for arg in args]
pickled_kw = [(k, self.__wrap(v)) for (k, v) in kw.items()]

pickled = [i for i in pickled_args if i is not None] + [
v for (k, v) in pickled_kw if v is not None
]
remote = iter(self._remote_batch_loads(tuple(pickled)))

delivered_args = []
for local_arg, pickled_arg in zip(args, pickled_args):
delivered_args.append(
next(remote) if pickled_arg is not None else local_arg
)
delivered_kw = {}
for k, pickled_v in pickled_kw:
delivered_kw[k] = next(remote) if pickled_v is not None else kw[k]

return tuple(delivered_args), delivered_kw

def obtain(self, remote):
while True:
try:
remote = object.__getattribute__(remote, "__remote_end__")
except AttributeError:
break
return pickle.loads(self._remote_dumps(remote))

def obtain_tuple(self, remote):
while True:
try:
remote = object.__getattribute__(remote, "__remote_end__")
except AttributeError:
break
return self._remote_tuplize(remote)

def sync_request(self, handler, *args):
"""
Intercept outgoing synchronous requests from RPyC to add caching or
fulfilling them locally if possible to improve performance.
We should try to make as few remote calls as possible, because each
call adds up to latency.
"""
if handler == consts.HANDLE_INSPECT:
# always inspect classes from modin, pandas and numpy locally,
# do not go to network for those
id_name = str(args[0][0])
if id_name.split(".", 1)[0] in ("modin", "pandas", "numpy"):
try:
modobj = __import__(id_name)
for subname in id_name.split(".")[1:]:
modobj = getattr(modobj, subname)
except (ImportError, AttributeError):
pass
else:
return get_methods(netref.LOCAL_ATTRS, modobj)
modname, clsname = id_name.rsplit(".", 1)
try:
modobj = __import__(modname)
for subname in modname.split(".")[1:]:
modobj = getattr(modobj, subname)
clsobj = getattr(modobj, clsname)
except (ImportError, AttributeError):
pass
else:
return get_methods(netref.LOCAL_ATTRS, clsobj)
elif handler in (consts.HANDLE_GETATTR, consts.HANDLE_STR, consts.HANDLE_HASH):
if handler == consts.HANDLE_GETATTR:
obj, attr = args
key = (attr, handler)
else:
obj = args[0]
key = handler

if str(obj.____id_pack__[0]) in {"numpy", "numpy.dtype"}:
# always assume numpy attributes and numpy.dtype attributes are always the same;
# note that we're using RPyC id_pack as cache key, and it includes the name,
# class id and instance id, so this cache is unique to each instance of, say,
# numpy.dtype(), hence numpy.int16 and numpy.float64 got different caches.
cache = self._static_cache[obj.____id_pack__]
try:
result = cache[key]
except KeyError:
result = cache[key] = super().sync_request(handler, *args)
if handler == consts.HANDLE_GETATTR:
# save an entry in our cache telling that we get this attribute cached
self._static_cache[result.____id_pack__]["__getattr__"] = True
return result

return super().sync_request(handler, *args)

def async_request(self, handler, *args, **kw):
"""
Override async request handling to intercept outgoing deletion requests because we cache
certain things, and if we allow deletion of cached things our cache becomes stale.
We can clean the cache upon deletion, but it would increase cache misses a lot.
Also note that memory is not leaked forever, RPyC frees all of it upon disconnect.
"""
if handler == consts.HANDLE_DEL:
obj, _ = args
if obj.____id_pack__ in self._static_cache:
# object is cached by us, so ignore the request or remote end dies and cache is suddenly stale;
# we shouldn't remove item from cache as it would reduce performance
res = AsyncResult(self)
res._is_ready = True # simulate finished async request
return res
return super().async_request(handler, *args, **kw)

def _netref_factory(self, id_pack):
result = super()._netref_factory(id_pack)
id_name, cls_id, inst_id = id_pack
id_name = str(id_name)
first = id_name.split(".", 1)[0]
if first in ("modin", "numpy", "pandas") and inst_id:
try:
cached_cls = self._remote_cls_cache[(id_name, cls_id)]
except KeyError:
result = super()._netref_factory(id_pack)
self._remote_cls_cache[(id_name, cls_id)] = type(result)
else:
result = cached_cls(self, id_pack)
else:
result = super()._netref_factory(id_pack)
# try getting __real_cls__ from result.__class__ BUT make sure to
# NOT get it from some parent class for result.__class__, otherwise
# multiple wrappings happen
Expand Down Expand Up @@ -78,7 +221,13 @@ def _box(self, obj):
return super()._box(obj)

def _init_deliver(self):
self._remote_pickle_loads = self.modules["rpyc.lib.compat"].pickle.loads
self._remote_batch_loads = self.modules[
"modin.experimental.cloud.rpyc_proxy"
]._batch_loads
self._remote_dumps = self.modules["rpyc.lib.compat"].pickle.dumps
self._remote_tuplize = self.modules[
"modin.experimental.cloud.rpyc_proxy"
]._tuplize


class WrappingService(rpyc.ClassicService):
Expand Down Expand Up @@ -167,6 +316,7 @@ def __prepare__(*args, **kw):
type(obj).__dict__ (EXCLUDING parent classes) and then goes to proxy type.
"""
namespace = type.__prepare__(*args, **kw)
namespace["__remote_methods__"] = {}

# try computing overridden differently to allow subclassing one override from another
no_override = set(_NO_OVERRIDE)
Expand Down Expand Up @@ -204,9 +354,15 @@ def __prepare__(*args, **kw):
):

def method(_self, *_args, __method_name__=name, **_kw):
return getattr(_self.__remote_end__, __method_name__)(
*_args, **_kw
)
try:
remote = _self.__remote_methods__[__method_name__]
except KeyError:
# use remote_cls.__getattr__ to force RPyC return us
# a proxy for remote method call instead of its local wrapper
_self.__remote_methods__[
__method_name__
] = remote = remote_cls.__getattr__(__method_name__)
return remote(_self.__remote_end__, *_args, **_kw)

method.__name__ = name
namespace[name] = method
Expand Down Expand Up @@ -353,9 +509,16 @@ class DeliveringMixin:
for method in methods:

def wrapper(self, *args, __remote_conn__=conn, __method_name__=method, **kw):
args = tuple(__remote_conn__.deliver(x) for x in args)
kw = {k: __remote_conn__.deliver(v) for k, v in kw.items()}
return getattr(self.__remote_end__, __method_name__)(*args, **kw)
args, kw = __remote_conn__.deliver(args, kw)
cache = object.__getattribute__(self, "__remote_methods__")
try:
remote = cache[__method_name__]
except KeyError:
# see comments in ProxyMeta.__prepare__ on using remote_cls.__getattr__
cache[__method_name__] = remote = remote_cls.__getattr__(
__method_name__
)
return remote(self.__remote_end__, *args, **kw)

wrapper.__name__ = method
setattr(mixin, method, wrapper)
Expand Down Expand Up @@ -396,10 +559,30 @@ def make_dataframe_wrapper(DataFrame):
It makes DF.loc, DF.groupby() and other methods listed below deliver their
arguments to remote end by value.
"""

from modin.pandas.series import Series

conn = get_connection()

class ObtainingItems:
def items(self):
return conn.obtain_tuple(self.__remote_end__.items())

def iteritems(self):
return conn.obtain_tuple(self.__remote_end__.iteritems())

ObtainingItems = _deliveringWrapper(Series, mixin=ObtainingItems)

class DataFrameOverrides(_prepare_loc_mixin()):
@property
def dtypes(self):
remote_dtypes = self.__remote_end__.dtypes
return ObtainingItems(__remote_end__=remote_dtypes)

DeliveringDataFrame = _deliveringWrapper(
DataFrame,
["groupby", "agg", "aggregate", "__getitem__", "astype", "drop", "merge"],
_prepare_loc_mixin(),
DataFrameOverrides,
"DataFrame",
)
return DeliveringDataFrame
Expand Down

0 comments on commit 0a03e7a

Please sign in to comment.