diff --git a/modin/experimental/cloud/rpyc_proxy.py b/modin/experimental/cloud/rpyc_proxy.py index 5e2bbfa1f7d..e7dcd9a0e10 100644 --- a/modin/experimental/cloud/rpyc_proxy.py +++ b/modin/experimental/cloud/rpyc_proxy.py @@ -15,7 +15,9 @@ import rpyc from rpyc.lib.compat import pickle -from rpyc.core import netref +from rpyc.lib import get_methods + +from rpyc.core import netref, AsyncResult from . import get_connection from .meta_magic import _LOCAL_ATTRS, RemoteMeta, _KNOWN_DUALS @@ -127,6 +129,29 @@ def sync_request(self, handler, *args): return super().sync_request(handler, *args) + def async_request(self, handler, *args, **kw): + if handler == consts.HANDLE_DEL: + obj, refcount = args + try: + obj_class = object.__getattribute__(obj, '__class__') + except AttributeError: + obj_class = None + if type(obj).__name__ in ('numpy',) or (getattr(obj_class, '__module__', None) in ('numpy',) and obj_class.__name__ in ('dtype',)): + """ + # we have this cached, but a deletion is requested, remove the from cache + self._static_cache.pop(obj.____id_pack__, None) + """ + try: + cache = self._static_cache[obj.____id_pack__] + except KeyError: + pass + else: + # object is cached by us, so ignore the request or remote end dies and cache is suddenly stale + res = AsyncResult(self) + res._is_ready = True + return res + return super().async_request(handler, *args, **kw) + def _netref_factory(self, id_pack): id_name, cls_id, inst_id = id_pack id_name = str(id_name) @@ -518,6 +543,8 @@ def make_dataframe_wrapper(DataFrame): class ObtainingItems: def items(self): return conn.obtain_tuple(self.__remote_end__.items()) + def iteritems(self): + return conn.obtain_tuple(self.__remote_end__.iteritems()) ObtainingItems = _deliveringWrapper(Series, mixin=ObtainingItems)