From af7ca3fc76b0a6b376c4c99ae3eab70e62d3e575 Mon Sep 17 00:00:00 2001 From: Kirill Suvorov Date: Mon, 8 Jul 2024 12:55:50 +0000 Subject: [PATCH 01/11] FEAT-#7337: Using dynamic partitionning in broadcast_apply Signed-off-by: Kirill Suvorov --- .../pandas/partitioning/partition_manager.py | 121 +++++++++++++++++- 1 file changed, 120 insertions(+), 1 deletion(-) diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 05854239206..fdc2a5662d7 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -440,7 +440,7 @@ def get_partitions(index): @classmethod @wait_computations_if_benchmark_mode - def broadcast_apply(cls, axis, apply_func, left, right): + def base_broadcast_apply(cls, axis, apply_func, left, right): """ Broadcast the `right` partitions to `left` and apply `apply_func` function. @@ -493,6 +493,68 @@ def map_func(df, *others): ] ) + @classmethod + @wait_computations_if_benchmark_mode + def broadcast_axis( + cls, + axis, + apply_func, + left, + right, + keep_partitioning=False, + ): + """ + Broadcast the `right` partitions to `left` and apply `apply_func` along full `axis`. + + Parameters + ---------- + axis : {0, 1} + Axis to apply and broadcast over. + apply_func : callable + Function to apply. + left : NumPy 2D array + Left partitions. + right : NumPy 2D array + Right partitions. + keep_partitioning : boolean, default: False + The flag to keep partition boundaries for Modin Frame if possible. + Setting it to True disables shuffling data from one partition to another in case the resulting + number of splits is equal to the initial number of splits. + + Returns + ------- + NumPy array + An array of partition objects. + + Notes + ----- + This method differs from `broadcast_axis_partitions` in that it does not send + all right partitions for each remote task based on the left partitions. + """ + num_splits = len(left) if axis == 0 else len(left.T) + preprocessed_map_func = cls.preprocess_func(apply_func) + left_partitions = cls.axis_partition(left, axis) + right_partitions = None if right is None else cls.axis_partition(right, axis) + kw = { + "num_splits": num_splits, + "maintain_partitioning": keep_partitioning, + } + + result_blocks = np.array( + [ + left_partitions[i].apply( + preprocessed_map_func, + other_axis_partition=right_partitions[i], + **kw, + ) + for i in np.arange(len(left_partitions)) + ] + ) + # If we are mapping over columns, they are returned to use the same as + # rows, so we need to transpose the returned 2D NumPy array to return + # the structure to the correct order. + return result_blocks.T if not axis else result_blocks + @classmethod @wait_computations_if_benchmark_mode def broadcast_axis_partitions( @@ -648,6 +710,63 @@ def base_map_partitions( ] ) + @classmethod + @wait_computations_if_benchmark_mode + def broadcast_apply( + cls, + axis, + apply_func, + left, + right, + ): + """ + Broadcast the `right` partitions to `left` and apply `apply_func` function + using different approaches to achieve the best performance. + + Parameters + ---------- + axis : {0, 1} + Axis to apply and broadcast over. + apply_func : callable + Function to apply. + left : np.ndarray + NumPy array of left partitions. + right : np.ndarray + NumPy array of right partitions. + + Returns + ------- + np.ndarray + NumPy array of result partition objects. + """ + # The condition for the execution of `base_broadcast_apply` is different from + # the same condition in the `map_partitions`, since the columnar partitioning approach + # cannot be implemented for the `broadcast_apply`. This is due to the fact that different + # partitions of the left and right dataframes are possible for the `broadcast_apply`, + # as a result of which it is necessary to merge partitions on both axes at once, + # which leads to large slowdowns. + if ( + np.prod(left.shape) <= 1.5 * CpuCount.get() + or left.shape[axis] < CpuCount.get() // 5 + ): + # block-wise broadcast + new_partitions = cls.base_broadcast_apply( + axis, + apply_func, + left, + right, + ) + else: + # axis-wise broadcast + new_partitions = cls.broadcast_axis( + axis=axis ^ 1, + left=left, + right=right, + apply_func=apply_func, + keep_partitioning=True, + ) + return new_partitions + @classmethod @wait_computations_if_benchmark_mode def map_partitions( From 78df91d7a6a6519abebdadb9952658c444a6845d Mon Sep 17 00:00:00 2001 From: Kirill Suvorov Date: Wed, 10 Jul 2024 17:48:40 +0000 Subject: [PATCH 02/11] fix ci --- modin/core/dataframe/algebra/groupby.py | 8 ++++-- .../dataframe/pandas/dataframe/dataframe.py | 4 +-- .../pandas/partitioning/partition_manager.py | 28 ++++++------------- 3 files changed, 15 insertions(+), 25 deletions(-) diff --git a/modin/core/dataframe/algebra/groupby.py b/modin/core/dataframe/algebra/groupby.py index cc9196a422a..fec0fe3c6ac 100644 --- a/modin/core/dataframe/algebra/groupby.py +++ b/modin/core/dataframe/algebra/groupby.py @@ -655,9 +655,11 @@ def aggregate_on_dict(grp_obj, *args, **kwargs): ) native_res_part = [] if native_agg_res is None else [native_agg_res] - result = pandas.concat( - [*native_res_part, *custom_results], axis=1, copy=False - ) + parts = [*native_res_part, *custom_results] + if parts: + result = pandas.concat(parts, axis=1, copy=False) + else: + result = pandas.DataFrame(columns=result_columns) # The order is naturally preserved if there's no custom aggregations if preserve_aggregation_order and len(custom_aggs): diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 5456f28f127..5d46d4b3190 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3265,9 +3265,7 @@ def broadcast_apply( axis ), self.copy_axis_cache(axis) - new_frame = self._partition_mgr_cls.broadcast_apply( - axis, func, left_parts, right_parts - ) + new_frame = self._partition_mgr_cls.apply(axis, func, left_parts, right_parts) if isinstance(dtypes, str) and dtypes == "copy": dtypes = self.copy_dtypes_cache() diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index fdc2a5662d7..0a422d04159 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -339,9 +339,7 @@ def groupby_reduce( f"the number of partitions along {axis=} is not equal: " + f"{partitions.shape[axis]} != {by.shape[axis]}" ) - mapped_partitions = cls.broadcast_apply( - axis, map_func, left=partitions, right=by - ) + mapped_partitions = cls.apply(axis, map_func, left=partitions, right=by) else: mapped_partitions = cls.map_partitions(partitions, map_func) @@ -440,7 +438,7 @@ def get_partitions(index): @classmethod @wait_computations_if_benchmark_mode - def base_broadcast_apply(cls, axis, apply_func, left, right): + def broadcast_apply(cls, axis, apply_func, left, right): """ Broadcast the `right` partitions to `left` and apply `apply_func` function. @@ -495,13 +493,12 @@ def map_func(df, *others): @classmethod @wait_computations_if_benchmark_mode - def broadcast_axis( + def apply_axis_partitions( cls, axis, apply_func, left, right, - keep_partitioning=False, ): """ Broadcast the `right` partitions to `left` and apply `apply_func` along full `axis`. @@ -531,21 +528,15 @@ def broadcast_axis( This method differs from `broadcast_axis_partitions` in that it does not send all right partitions for each remote task based on the left partitions. """ - num_splits = len(left) if axis == 0 else len(left.T) preprocessed_map_func = cls.preprocess_func(apply_func) left_partitions = cls.axis_partition(left, axis) right_partitions = None if right is None else cls.axis_partition(right, axis) - kw = { - "num_splits": num_splits, - "maintain_partitioning": keep_partitioning, - } result_blocks = np.array( [ left_partitions[i].apply( preprocessed_map_func, other_axis_partition=right_partitions[i], - **kw, ) for i in np.arange(len(left_partitions)) ] @@ -712,7 +703,7 @@ def base_map_partitions( @classmethod @wait_computations_if_benchmark_mode - def broadcast_apply( + def apply( cls, axis, apply_func, @@ -739,10 +730,10 @@ def broadcast_apply( np.ndarray NumPy array of result partition objects. """ - # The condition for the execution of `base_broadcast_apply` is different from + # The condition for the execution of `broadcast_apply` is different from # the same condition in the `map_partitions`, since the columnar partitioning approach - # cannot be implemented for the `broadcast_apply`. This is due to the fact that different - # partitions of the left and right dataframes are possible for the `broadcast_apply`, + # cannot be implemented for the `apply`. This is due to the fact that different + # partitions of the left and right dataframes are possible for the `apply`, # as a result of which it is necessary to merge partitions on both axes at once, # which leads to large slowdowns. if ( @@ -750,7 +741,7 @@ def broadcast_apply( or left.shape[axis] < CpuCount.get() // 5 ): # block-wise broadcast - new_partitions = cls.base_broadcast_apply( + new_partitions = cls.broadcast_apply( axis, apply_func, left, @@ -758,12 +749,11 @@ def broadcast_apply( ) else: # axis-wise broadcast - new_partitions = cls.broadcast_axis( + new_partitions = cls.apply_axis_partitions( axis=axis ^ 1, left=left, right=right, apply_func=apply_func, - keep_partitioning=True, ) return new_partitions From 8510dc1a9bfd6f649fa0a939e40bc0ac8b08dd0b Mon Sep 17 00:00:00 2001 From: Kirill Suvorov Date: Thu, 11 Jul 2024 09:54:39 +0000 Subject: [PATCH 03/11] fix lint --- modin/core/dataframe/pandas/partitioning/partition_manager.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 0a422d04159..13fb33ef023 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -513,10 +513,6 @@ def apply_axis_partitions( Left partitions. right : NumPy 2D array Right partitions. - keep_partitioning : boolean, default: False - The flag to keep partition boundaries for Modin Frame if possible. - Setting it to True disables shuffling data from one partition to another in case the resulting - number of splits is equal to the initial number of splits. Returns ------- From d1ab6aedd18bf2cce4233e06fa67e6ee2bb0d769 Mon Sep 17 00:00:00 2001 From: Kirill Suvorov Date: Mon, 15 Jul 2024 13:40:30 +0000 Subject: [PATCH 04/11] refactoring --- .../dataframe/pandas/dataframe/dataframe.py | 4 +- .../pandas/partitioning/partition_manager.py | 76 ++++--------------- .../storage_formats/pandas/query_compiler.py | 10 +-- 3 files changed, 20 insertions(+), 70 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 5d46d4b3190..5456f28f127 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3265,7 +3265,9 @@ def broadcast_apply( axis ), self.copy_axis_cache(axis) - new_frame = self._partition_mgr_cls.apply(axis, func, left_parts, right_parts) + new_frame = self._partition_mgr_cls.broadcast_apply( + axis, func, left_parts, right_parts + ) if isinstance(dtypes, str) and dtypes == "copy": dtypes = self.copy_dtypes_cache() diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 13fb33ef023..25bb24a1b46 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -339,7 +339,9 @@ def groupby_reduce( f"the number of partitions along {axis=} is not equal: " + f"{partitions.shape[axis]} != {by.shape[axis]}" ) - mapped_partitions = cls.apply(axis, map_func, left=partitions, right=by) + mapped_partitions = cls.broadcast_apply( + axis, map_func, left=partitions, right=by + ) else: mapped_partitions = cls.map_partitions(partitions, map_func) @@ -438,7 +440,7 @@ def get_partitions(index): @classmethod @wait_computations_if_benchmark_mode - def broadcast_apply(cls, axis, apply_func, left, right): + def base_broadcast_apply(cls, axis, apply_func, left, right): """ Broadcast the `right` partitions to `left` and apply `apply_func` function. @@ -491,57 +493,6 @@ def map_func(df, *others): ] ) - @classmethod - @wait_computations_if_benchmark_mode - def apply_axis_partitions( - cls, - axis, - apply_func, - left, - right, - ): - """ - Broadcast the `right` partitions to `left` and apply `apply_func` along full `axis`. - - Parameters - ---------- - axis : {0, 1} - Axis to apply and broadcast over. - apply_func : callable - Function to apply. - left : NumPy 2D array - Left partitions. - right : NumPy 2D array - Right partitions. - - Returns - ------- - NumPy array - An array of partition objects. - - Notes - ----- - This method differs from `broadcast_axis_partitions` in that it does not send - all right partitions for each remote task based on the left partitions. - """ - preprocessed_map_func = cls.preprocess_func(apply_func) - left_partitions = cls.axis_partition(left, axis) - right_partitions = None if right is None else cls.axis_partition(right, axis) - - result_blocks = np.array( - [ - left_partitions[i].apply( - preprocessed_map_func, - other_axis_partition=right_partitions[i], - ) - for i in np.arange(len(left_partitions)) - ] - ) - # If we are mapping over columns, they are returned to use the same as - # rows, so we need to transpose the returned 2D NumPy array to return - # the structure to the correct order. - return result_blocks.T if not axis else result_blocks - @classmethod @wait_computations_if_benchmark_mode def broadcast_axis_partitions( @@ -553,6 +504,7 @@ def broadcast_axis_partitions( keep_partitioning=False, num_splits=None, apply_indices=None, + send_all_right=True, enumerate_partitions=False, lengths=None, apply_func_args=None, @@ -581,6 +533,8 @@ def broadcast_axis_partitions( then the number of splits is preserved. apply_indices : list of ints, default: None Indices of `axis ^ 1` to apply function over. + send_all_right: bool, default: True + Whether or not to pass all right axis partitions to each of the left axis partitions. enumerate_partitions : bool, default: False Whether or not to pass partition index into `apply_func`. Note that `apply_func` must be able to accept `partition_idx` kwarg. @@ -627,7 +581,6 @@ def broadcast_axis_partitions( # load-balance the data as well. kw = { "num_splits": num_splits, - "other_axis_partition": right_partitions, "maintain_partitioning": keep_partitioning, } if lengths: @@ -642,6 +595,9 @@ def broadcast_axis_partitions( left_partitions[i].apply( preprocessed_map_func, *(apply_func_args if apply_func_args else []), + other_axis_partition=( + right_partitions if send_all_right else right_partitions[i] + ), **kw, **({"partition_idx": idx} if enumerate_partitions else {}), **kwargs, @@ -699,7 +655,7 @@ def base_map_partitions( @classmethod @wait_computations_if_benchmark_mode - def apply( + def broadcast_apply( cls, axis, apply_func, @@ -732,12 +688,9 @@ def apply( # partitions of the left and right dataframes are possible for the `apply`, # as a result of which it is necessary to merge partitions on both axes at once, # which leads to large slowdowns. - if ( - np.prod(left.shape) <= 1.5 * CpuCount.get() - or left.shape[axis] < CpuCount.get() // 5 - ): + if np.prod(left.shape) <= 1.5 * CpuCount.get(): # block-wise broadcast - new_partitions = cls.broadcast_apply( + new_partitions = cls.base_broadcast_apply( axis, apply_func, left, @@ -745,11 +698,12 @@ def apply( ) else: # axis-wise broadcast - new_partitions = cls.apply_axis_partitions( + new_partitions = cls.broadcast_axis_partitions( axis=axis ^ 1, left=left, right=right, apply_func=apply_func, + send_all_right=False, ) return new_partitions diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 3581516a638..a0c5b8d6522 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -45,7 +45,7 @@ from pandas.core.indexing import check_bool_indexer from pandas.errors import DataError -from modin.config import CpuCount, RangePartitioning +from modin.config import RangePartitioning from modin.core.dataframe.algebra import ( Binary, Fold, @@ -3157,14 +3157,8 @@ def dropna(self, **kwargs): lib.no_default, None, ) - # FIXME: this is a naive workaround for this problem: https://github.com/modin-project/modin/issues/5394 - # if there are too many partitions then all non-full-axis implementations start acting very badly. - # The here threshold is pretty random though it works fine on simple scenarios - processable_amount_of_partitions = ( - self._modin_frame.num_parts < CpuCount.get() * 32 - ) - if is_column_wise and no_thresh_passed and processable_amount_of_partitions: + if is_column_wise and no_thresh_passed: how = kwargs.get("how", "any") subset = kwargs.get("subset") how = "any" if how in (lib.no_default, None) else how From 2bb062fe99b41903b219e4ef4fdc84f02c25b601 Mon Sep 17 00:00:00 2001 From: Kirill Suvorov Date: Tue, 16 Jul 2024 13:14:01 +0000 Subject: [PATCH 05/11] fix lint --- .../pandas/partitioning/partition_manager.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 25bb24a1b46..f8b8d73dff9 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -533,7 +533,7 @@ def broadcast_axis_partitions( then the number of splits is preserved. apply_indices : list of ints, default: None Indices of `axis ^ 1` to apply function over. - send_all_right: bool, default: True + send_all_right : bool, default: True Whether or not to pass all right axis partitions to each of the left axis partitions. enumerate_partitions : bool, default: False Whether or not to pass partition index into `apply_func`. @@ -663,8 +663,7 @@ def broadcast_apply( right, ): """ - Broadcast the `right` partitions to `left` and apply `apply_func` function - using different approaches to achieve the best performance. + Broadcast the `right` partitions to `left` and apply `apply_func` function using different approaches to achieve the best performance. Parameters ---------- @@ -682,12 +681,9 @@ def broadcast_apply( np.ndarray NumPy array of result partition objects. """ - # The condition for the execution of `broadcast_apply` is different from - # the same condition in the `map_partitions`, since the columnar partitioning approach - # cannot be implemented for the `apply`. This is due to the fact that different - # partitions of the left and right dataframes are possible for the `apply`, - # as a result of which it is necessary to merge partitions on both axes at once, - # which leads to large slowdowns. + # The `broadcast_apply` runtime condition differs from + # the same condition in `map_partitions` because the columnar + # approach for `broadcast_apply` results in a slowdown. if np.prod(left.shape) <= 1.5 * CpuCount.get(): # block-wise broadcast new_partitions = cls.base_broadcast_apply( From 7fd5a6703d5b41fe03e11d7c1d35206da9b11628 Mon Sep 17 00:00:00 2001 From: Kirill Suvorov Date: Thu, 18 Jul 2024 09:23:14 +0000 Subject: [PATCH 06/11] fix keep_partitioning --- .../dataframe/pandas/partitioning/partition_manager.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index f8b8d73dff9..07a4d2de027 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -504,7 +504,7 @@ def broadcast_axis_partitions( keep_partitioning=False, num_splits=None, apply_indices=None, - send_all_right=True, + broadcast_all=True, enumerate_partitions=False, lengths=None, apply_func_args=None, @@ -533,7 +533,7 @@ def broadcast_axis_partitions( then the number of splits is preserved. apply_indices : list of ints, default: None Indices of `axis ^ 1` to apply function over. - send_all_right : bool, default: True + broadcast_all : bool, default: True Whether or not to pass all right axis partitions to each of the left axis partitions. enumerate_partitions : bool, default: False Whether or not to pass partition index into `apply_func`. @@ -596,7 +596,7 @@ def broadcast_axis_partitions( preprocessed_map_func, *(apply_func_args if apply_func_args else []), other_axis_partition=( - right_partitions if send_all_right else right_partitions[i] + right_partitions if broadcast_all else right_partitions[i] ), **kw, **({"partition_idx": idx} if enumerate_partitions else {}), @@ -699,7 +699,8 @@ def broadcast_apply( left=left, right=right, apply_func=apply_func, - send_all_right=False, + broadcast_all=False, + keep_partitioning=True, ) return new_partitions From 1ade9c0c92c9b02a3e0de89cbc68f6a23cc9f774 Mon Sep 17 00:00:00 2001 From: Kirill Suvorov Date: Thu, 1 Aug 2024 09:47:43 +0000 Subject: [PATCH 07/11] Change DP conditions --- .../dataframe/pandas/dataframe/dataframe.py | 45 ++++++++++++++-- .../pandas/partitioning/partition_manager.py | 53 +------------------ .../storage_formats/pandas/query_compiler.py | 7 ++- 3 files changed, 49 insertions(+), 56 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 5456f28f127..a4d9c1a515e 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -33,6 +33,7 @@ from pandas.core.indexes.api import Index, RangeIndex from modin.config import ( + CpuCount, Engine, IsRayCluster, MinColumnPartitionSize, @@ -211,6 +212,22 @@ def num_parts(self) -> int: """ return np.prod(self._partitions.shape) + @property + def size(self) -> Optional[int]: + """ + Get an int representing the number of elements in this frame, if known. + + Returns + ------- + int or None + """ + if self.has_index_cache and self.has_columns_cache: + return len(self.index) * len(self.columns) + elif self._row_lengths_cache and self._column_widths_cache: + return sum(self._row_lengths_cache) * sum(self._column_widths_cache) + else: + return None + @property def row_lengths(self): """ @@ -3265,9 +3282,31 @@ def broadcast_apply( axis ), self.copy_axis_cache(axis) - new_frame = self._partition_mgr_cls.broadcast_apply( - axis, func, left_parts, right_parts - ) + # check the conditions for use of dynamic partitioning + use_dynamic_partitioning = False + if self.num_parts <= 1.5 * CpuCount.get(): + use_dynamic_partitioning = True + + # When the frame is large, dynamic partitioning + # performs worse than the based approach + frame_size = self.size + if frame_size and (frame_size >= 4 * 10**9 or len(self) >= 10**7): + use_dynamic_partitioning = False + + if use_dynamic_partitioning: + new_frame = self._partition_mgr_cls.broadcast_axis_partitions( + axis=axis ^ 1, + left=left_parts, + right=right_parts, + apply_func=func, + broadcast_all=False, + keep_partitioning=True, + ) + else: + new_frame = self._partition_mgr_cls.broadcast_apply( + axis, func, left_parts, right_parts + ) + if isinstance(dtypes, str) and dtypes == "copy": dtypes = self.copy_dtypes_cache() diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 07a4d2de027..e07747afd47 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -440,7 +440,7 @@ def get_partitions(index): @classmethod @wait_computations_if_benchmark_mode - def base_broadcast_apply(cls, axis, apply_func, left, right): + def broadcast_apply(cls, axis, apply_func, left, right): """ Broadcast the `right` partitions to `left` and apply `apply_func` function. @@ -653,57 +653,6 @@ def base_map_partitions( ] ) - @classmethod - @wait_computations_if_benchmark_mode - def broadcast_apply( - cls, - axis, - apply_func, - left, - right, - ): - """ - Broadcast the `right` partitions to `left` and apply `apply_func` function using different approaches to achieve the best performance. - - Parameters - ---------- - axis : {0, 1} - Axis to apply and broadcast over. - apply_func : callable - Function to apply. - left : np.ndarray - NumPy array of left partitions. - right : np.ndarray - NumPy array of right partitions. - - Returns - ------- - np.ndarray - NumPy array of result partition objects. - """ - # The `broadcast_apply` runtime condition differs from - # the same condition in `map_partitions` because the columnar - # approach for `broadcast_apply` results in a slowdown. - if np.prod(left.shape) <= 1.5 * CpuCount.get(): - # block-wise broadcast - new_partitions = cls.base_broadcast_apply( - axis, - apply_func, - left, - right, - ) - else: - # axis-wise broadcast - new_partitions = cls.broadcast_axis_partitions( - axis=axis ^ 1, - left=left, - right=right, - apply_func=apply_func, - broadcast_all=False, - keep_partitioning=True, - ) - return new_partitions - @classmethod @wait_computations_if_benchmark_mode def map_partitions( diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index a0c5b8d6522..f6c3892fc09 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -46,6 +46,7 @@ from pandas.errors import DataError from modin.config import RangePartitioning +from modin.config.envvars import CpuCount from modin.core.dataframe.algebra import ( Binary, Fold, @@ -3157,8 +3158,12 @@ def dropna(self, **kwargs): lib.no_default, None, ) + # The map reduce approach works well for frames with few columnar partitions + processable_amount_of_partitions = ( + self._modin_frame.num_parts < CpuCount.get() * 32 + ) - if is_column_wise and no_thresh_passed: + if is_column_wise and no_thresh_passed and processable_amount_of_partitions: how = kwargs.get("how", "any") subset = kwargs.get("subset") how = "any" if how in (lib.no_default, None) else how From 0ff9097d50adc8c34931f8fd26d2aa887904bf92 Mon Sep 17 00:00:00 2001 From: Kirill Suvorov Date: Tue, 13 Aug 2024 08:25:53 +0000 Subject: [PATCH 08/11] Revert 'Change DP conditions' --- .../dataframe/pandas/dataframe/dataframe.py | 45 ++-------------- .../pandas/partitioning/partition_manager.py | 53 ++++++++++++++++++- 2 files changed, 55 insertions(+), 43 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index a4d9c1a515e..5456f28f127 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -33,7 +33,6 @@ from pandas.core.indexes.api import Index, RangeIndex from modin.config import ( - CpuCount, Engine, IsRayCluster, MinColumnPartitionSize, @@ -212,22 +211,6 @@ def num_parts(self) -> int: """ return np.prod(self._partitions.shape) - @property - def size(self) -> Optional[int]: - """ - Get an int representing the number of elements in this frame, if known. - - Returns - ------- - int or None - """ - if self.has_index_cache and self.has_columns_cache: - return len(self.index) * len(self.columns) - elif self._row_lengths_cache and self._column_widths_cache: - return sum(self._row_lengths_cache) * sum(self._column_widths_cache) - else: - return None - @property def row_lengths(self): """ @@ -3282,31 +3265,9 @@ def broadcast_apply( axis ), self.copy_axis_cache(axis) - # check the conditions for use of dynamic partitioning - use_dynamic_partitioning = False - if self.num_parts <= 1.5 * CpuCount.get(): - use_dynamic_partitioning = True - - # When the frame is large, dynamic partitioning - # performs worse than the based approach - frame_size = self.size - if frame_size and (frame_size >= 4 * 10**9 or len(self) >= 10**7): - use_dynamic_partitioning = False - - if use_dynamic_partitioning: - new_frame = self._partition_mgr_cls.broadcast_axis_partitions( - axis=axis ^ 1, - left=left_parts, - right=right_parts, - apply_func=func, - broadcast_all=False, - keep_partitioning=True, - ) - else: - new_frame = self._partition_mgr_cls.broadcast_apply( - axis, func, left_parts, right_parts - ) - + new_frame = self._partition_mgr_cls.broadcast_apply( + axis, func, left_parts, right_parts + ) if isinstance(dtypes, str) and dtypes == "copy": dtypes = self.copy_dtypes_cache() diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index e07747afd47..07a4d2de027 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -440,7 +440,7 @@ def get_partitions(index): @classmethod @wait_computations_if_benchmark_mode - def broadcast_apply(cls, axis, apply_func, left, right): + def base_broadcast_apply(cls, axis, apply_func, left, right): """ Broadcast the `right` partitions to `left` and apply `apply_func` function. @@ -653,6 +653,57 @@ def base_map_partitions( ] ) + @classmethod + @wait_computations_if_benchmark_mode + def broadcast_apply( + cls, + axis, + apply_func, + left, + right, + ): + """ + Broadcast the `right` partitions to `left` and apply `apply_func` function using different approaches to achieve the best performance. + + Parameters + ---------- + axis : {0, 1} + Axis to apply and broadcast over. + apply_func : callable + Function to apply. + left : np.ndarray + NumPy array of left partitions. + right : np.ndarray + NumPy array of right partitions. + + Returns + ------- + np.ndarray + NumPy array of result partition objects. + """ + # The `broadcast_apply` runtime condition differs from + # the same condition in `map_partitions` because the columnar + # approach for `broadcast_apply` results in a slowdown. + if np.prod(left.shape) <= 1.5 * CpuCount.get(): + # block-wise broadcast + new_partitions = cls.base_broadcast_apply( + axis, + apply_func, + left, + right, + ) + else: + # axis-wise broadcast + new_partitions = cls.broadcast_axis_partitions( + axis=axis ^ 1, + left=left, + right=right, + apply_func=apply_func, + broadcast_all=False, + keep_partitioning=True, + ) + return new_partitions + @classmethod @wait_computations_if_benchmark_mode def map_partitions( From 39421e0f7186a0a9521c907540ec2937cf588fa4 Mon Sep 17 00:00:00 2001 From: Kirill Suvorov Date: Mon, 19 Aug 2024 12:24:49 +0000 Subject: [PATCH 09/11] Use DynamicPartitioning env variable --- .../dataframe/pandas/partitioning/partition_manager.py | 7 +++---- modin/core/storage_formats/pandas/query_compiler.py | 3 +-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 07a4d2de027..cb207f64d4e 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -681,10 +681,7 @@ def broadcast_apply( np.ndarray NumPy array of result partition objects. """ - # The `broadcast_apply` runtime condition differs from - # the same condition in `map_partitions` because the columnar - # approach for `broadcast_apply` results in a slowdown. - if np.prod(left.shape) <= 1.5 * CpuCount.get(): + if not DynamicPartitioning.get(): # block-wise broadcast new_partitions = cls.base_broadcast_apply( axis, @@ -693,6 +690,8 @@ def broadcast_apply( right, ) else: + # The dynamic partitioning behavior of `broadcast_apply` differs from that of `map_partitions`, + # since the columnar approach for `broadcast_apply` results in slowdown. # axis-wise broadcast new_partitions = cls.broadcast_axis_partitions( axis=axis ^ 1, diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index f6c3892fc09..410bd2b50d8 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -45,8 +45,7 @@ from pandas.core.indexing import check_bool_indexer from pandas.errors import DataError -from modin.config import RangePartitioning -from modin.config.envvars import CpuCount +from modin.config import CpuCount, RangePartitioning from modin.core.dataframe.algebra import ( Binary, Fold, From 464055f7b2ef6236389d36dc3c5c2c4ea72ad917 Mon Sep 17 00:00:00 2001 From: Kirill Suvorov Date: Thu, 22 Aug 2024 12:28:17 +0000 Subject: [PATCH 10/11] Add test for empty partitions after groupby --- modin/tests/pandas/test_groupby.py | 34 +++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/modin/tests/pandas/test_groupby.py b/modin/tests/pandas/test_groupby.py index b82473c674b..12c0b888ff2 100644 --- a/modin/tests/pandas/test_groupby.py +++ b/modin/tests/pandas/test_groupby.py @@ -21,7 +21,13 @@ import pytest import modin.pandas as pd -from modin.config import IsRayCluster, NPartitions, RangePartitioning, StorageFormat +from modin.config import ( + DynamicPartitioning, + IsRayCluster, + NPartitions, + RangePartitioning, + StorageFormat, +) from modin.core.dataframe.algebra.default2pandas.groupby import GroupBy from modin.core.dataframe.pandas.partitioning.axis_partition import ( PandasDataframeAxisPartition, @@ -2431,6 +2437,32 @@ def test_multi_column_groupby_different_partitions( ) +def test_empty_partitions_after_groupby(): + DynamicPartitioning.put(True) + + def func_to_apply(grp): + return grp.agg( + { + list(test_data_values[0].keys())[1]: "sum", + list(test_data_values[0].keys())[-1]: "sum", + } + ) + + data = test_data_values[0] + md_df, pd_df = create_test_dfs(data) + by = pd_df.columns[0] + + md_grp, pd_grp = ( + md_df.groupby(by), + pd_df.groupby(by), + ) + eval_general( + md_grp, + pd_grp, + func_to_apply, + ) + + @pytest.mark.parametrize( "by", [ From 018e73bb4d508a4f419be647fbbb3c0f272dc89d Mon Sep 17 00:00:00 2001 From: Kirill Suvorov Date: Thu, 22 Aug 2024 13:02:13 +0000 Subject: [PATCH 11/11] use cfg context --- modin/tests/pandas/test_groupby.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/modin/tests/pandas/test_groupby.py b/modin/tests/pandas/test_groupby.py index 12c0b888ff2..36987c0d931 100644 --- a/modin/tests/pandas/test_groupby.py +++ b/modin/tests/pandas/test_groupby.py @@ -22,11 +22,11 @@ import modin.pandas as pd from modin.config import ( - DynamicPartitioning, IsRayCluster, NPartitions, RangePartitioning, StorageFormat, + context, ) from modin.core.dataframe.algebra.default2pandas.groupby import GroupBy from modin.core.dataframe.pandas.partitioning.axis_partition import ( @@ -2438,8 +2438,6 @@ def test_multi_column_groupby_different_partitions( def test_empty_partitions_after_groupby(): - DynamicPartitioning.put(True) - def func_to_apply(grp): return grp.agg( { @@ -2452,15 +2450,16 @@ def func_to_apply(grp): md_df, pd_df = create_test_dfs(data) by = pd_df.columns[0] - md_grp, pd_grp = ( - md_df.groupby(by), - pd_df.groupby(by), - ) - eval_general( - md_grp, - pd_grp, - func_to_apply, - ) + with context(DynamicPartitioning=True): + md_grp, pd_grp = ( + md_df.groupby(by), + pd_df.groupby(by), + ) + eval_general( + md_grp, + pd_grp, + func_to_apply, + ) @pytest.mark.parametrize(