diff --git a/modin/engines/base/frame/data.py b/modin/engines/base/frame/data.py index e5becdf4189..b6bfdec1801 100644 --- a/modin/engines/base/frame/data.py +++ b/modin/engines/base/frame/data.py @@ -706,7 +706,9 @@ def astype(self, col_dtypes): def astype_builder(df): return df.astype({k: v for k, v in col_dtypes.items() if k in df}) - new_frame = self._frame_mgr_cls.map_partitions(self._partitions, astype_builder) + new_frame = self._frame_mgr_cls.lazy_map_partitions( + self._partitions, astype_builder + ) return self.__constructor__( new_frame, self.index, @@ -1041,7 +1043,7 @@ def _map_reduce(self, axis, map_func, reduce_func=None, preserve_index=True): else: reduce_func = self._build_mapreduce_func(axis, reduce_func) - map_parts = self._frame_mgr_cls.map_partitions(self._partitions, map_func) + map_parts = self._frame_mgr_cls.lazy_map_partitions(self._partitions, map_func) reduce_parts = self._frame_mgr_cls.map_axis_partitions( axis, map_parts, reduce_func ) @@ -1079,7 +1081,7 @@ def _map(self, func, dtypes=None, validate_index=False, validate_columns=False): ------- A new dataframe. """ - new_partitions = self._frame_mgr_cls.map_partitions(self._partitions, func) + new_partitions = self._frame_mgr_cls.lazy_map_partitions(self._partitions, func) if dtypes == "copy": dtypes = self._dtypes elif dtypes is not None: diff --git a/modin/engines/base/frame/partition_manager.py b/modin/engines/base/frame/partition_manager.py index 9208b3b3741..40925001d93 100644 --- a/modin/engines/base/frame/partition_manager.py +++ b/modin/engines/base/frame/partition_manager.py @@ -83,7 +83,7 @@ def groupby_reduce(cls, axis, partitions, by, map_func, reduce_func): new_partitions = np.array( [ [ - part.apply( + part.add_to_apply_calls( map_func, other=by_parts[col_idx].get() if axis @@ -191,7 +191,7 @@ def broadcast_apply(cls, axis, apply_func, left, right): return np.array( [ [ - part.apply( + part.add_to_apply_calls( apply_func, r=right_parts[col_idx].get() if axis @@ -451,7 +451,9 @@ def _apply_func_to_list_of_partitions_broadcast( ): preprocessed_func = cls.preprocess_func(func) return [ - obj.apply(preprocessed_func, other=[o.get() for o in broadcasted], **kwargs) + obj.add_to_apply_calls( + preprocessed_func, other=[o.get() for o in broadcasted], **kwargs + ) for obj, broadcasted in zip(partitions, other.T) ] @@ -469,7 +471,9 @@ def _apply_func_to_list_of_partitions(cls, func, partitions, **kwargs): A list of BaseFramePartition objects. """ preprocessed_func = cls.preprocess_func(func) - return [obj.apply(preprocessed_func, **kwargs) for obj in partitions] + return [ + obj.add_to_apply_calls(preprocessed_func, **kwargs) for obj in partitions + ] @classmethod def apply_func_to_select_indices(