Skip to content

Commit

Permalink
FEAT-modin-project#1821: Add obtaining of df.dtypes.items()
Browse files Browse the repository at this point in the history
Signed-off-by: Vasilij Litvinov <vasilij.n.litvinov@intel.com>
  • Loading branch information
vnlitvinov committed Jul 28, 2020
1 parent e8027e2 commit 82709fe
Showing 1 changed file with 42 additions and 1 deletion.
43 changes: 42 additions & 1 deletion modin/experimental/cloud/rpyc_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@
def _batch_loads(items):
return tuple(pickle.loads(item) for item in items)

def _tuplize(arg):
'''turns any sequence or iterator into a flat tuple'''
return tuple(arg)

class WrappingConnection(rpyc.Connection):
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self._remote_batch_loads = None
self._remote_cls_cache = {}
self._static_cache = {}

def __wrap(self, local_obj):
while True:
# unwrap magic wrappers first
Expand Down Expand Up @@ -61,6 +67,21 @@ def deliver(self, args, kw):

return tuple(delivered_args), delivered_kw

def obtain(self, remote):
while True:
try:
remote = object.__getattribute__(remote, '__remote_end__')
except AttributeError:
break
return pickle.loads(self._remote_dumps(remote))

def obtain_tuple(self, remote):
while True:
try:
remote = object.__getattribute__(remote, '__remote_end__')
except AttributeError:
break
return self._remote_tuplize(remote)

def sync_request(self, handler, *args):
if handler == consts.HANDLE_INSPECT:
Expand Down Expand Up @@ -159,6 +180,8 @@ def _box(self, obj):

def _init_deliver(self):
self._remote_batch_loads = self.modules["modin.experimental.cloud.rpyc_proxy"]._batch_loads
self._remote_dumps = self.modules["rpyc.lib.compat"].pickle.dumps
self._remote_tuplize = self.modules["modin.experimental.cloud.rpyc_proxy"]._tuplize


class WrappingService(rpyc.ClassicService):
Expand Down Expand Up @@ -488,10 +511,28 @@ def make_dataframe_wrapper(DataFrame):
It makes DF.loc, DF.groupby() and other methods listed below deliver their
arguments to remote end by value.
"""

from modin.pandas.series import Series

conn = get_connection()
class ObtainingItems:
def items(self):
return conn.obtain_tuple(self.__remote_end__.items())
ObtainingItems = _deliveringWrapper(Series, mixin=ObtainingItems)


@property
def dtypes(self):
remote_dtypes = self.__remote_end__.dtypes
return ObtainingItems(__remote_end__=remote_dtypes)

DataFrameOverrides = _prepare_loc_mixin()
DataFrameOverrides.dtypes = dtypes

DeliveringDataFrame = _deliveringWrapper(
DataFrame,
["groupby", "agg", "aggregate", "__getitem__", "astype", "drop", "merge"],
_prepare_loc_mixin(),
DataFrameOverrides,
"DataFrame",
)
return DeliveringDataFrame
Expand Down

0 comments on commit 82709fe

Please sign in to comment.