Skip to content

Commit

Permalink
FEAT-modin-project#1838: Lazy map evaluation at Pandas backend (modin…
Browse files Browse the repository at this point in the history
  • Loading branch information
dchigarev authored Aug 28, 2020
1 parent 81cf80c commit b867edf
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
8 changes: 5 additions & 3 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,9 @@ def astype(self, col_dtypes):
def astype_builder(df):
return df.astype({k: v for k, v in col_dtypes.items() if k in df})

new_frame = self._frame_mgr_cls.map_partitions(self._partitions, astype_builder)
new_frame = self._frame_mgr_cls.lazy_map_partitions(
self._partitions, astype_builder
)
return self.__constructor__(
new_frame,
self.index,
Expand Down Expand Up @@ -1041,7 +1043,7 @@ def _map_reduce(self, axis, map_func, reduce_func=None, preserve_index=True):
else:
reduce_func = self._build_mapreduce_func(axis, reduce_func)

map_parts = self._frame_mgr_cls.map_partitions(self._partitions, map_func)
map_parts = self._frame_mgr_cls.lazy_map_partitions(self._partitions, map_func)
reduce_parts = self._frame_mgr_cls.map_axis_partitions(
axis, map_parts, reduce_func
)
Expand Down Expand Up @@ -1079,7 +1081,7 @@ def _map(self, func, dtypes=None, validate_index=False, validate_columns=False):
-------
A new dataframe.
"""
new_partitions = self._frame_mgr_cls.map_partitions(self._partitions, func)
new_partitions = self._frame_mgr_cls.lazy_map_partitions(self._partitions, func)
if dtypes == "copy":
dtypes = self._dtypes
elif dtypes is not None:
Expand Down
12 changes: 8 additions & 4 deletions modin/engines/base/frame/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def groupby_reduce(cls, axis, partitions, by, map_func, reduce_func):
new_partitions = np.array(
[
[
part.apply(
part.add_to_apply_calls(
map_func,
other=by_parts[col_idx].get()
if axis
Expand Down Expand Up @@ -191,7 +191,7 @@ def broadcast_apply(cls, axis, apply_func, left, right):
return np.array(
[
[
part.apply(
part.add_to_apply_calls(
apply_func,
r=right_parts[col_idx].get()
if axis
Expand Down Expand Up @@ -451,7 +451,9 @@ def _apply_func_to_list_of_partitions_broadcast(
):
preprocessed_func = cls.preprocess_func(func)
return [
obj.apply(preprocessed_func, other=[o.get() for o in broadcasted], **kwargs)
obj.add_to_apply_calls(
preprocessed_func, other=[o.get() for o in broadcasted], **kwargs
)
for obj, broadcasted in zip(partitions, other.T)
]

Expand All @@ -469,7 +471,9 @@ def _apply_func_to_list_of_partitions(cls, func, partitions, **kwargs):
A list of BaseFramePartition objects.
"""
preprocessed_func = cls.preprocess_func(func)
return [obj.apply(preprocessed_func, **kwargs) for obj in partitions]
return [
obj.add_to_apply_calls(preprocessed_func, **kwargs) for obj in partitions
]

@classmethod
def apply_func_to_select_indices(
Expand Down

0 comments on commit b867edf

Please sign in to comment.