diff --git a/docs/flow/modin/core/execution/dask/implementations/pandas_on_dask/index.rst b/docs/flow/modin/core/execution/dask/implementations/pandas_on_dask/index.rst index a08e1345fe3..b6ccbef53fc 100644 --- a/docs/flow/modin/core/execution/dask/implementations/pandas_on_dask/index.rst +++ b/docs/flow/modin/core/execution/dask/implementations/pandas_on_dask/index.rst @@ -34,7 +34,7 @@ specifically for the `PandasOnDask` execution. * :doc:`PandasOnDaskDataframe ` * :doc:`PandasOnDaskDataframePartition ` -* :doc:`PandasOnDaskDataframeAxisPartition ` +* :doc:`PandasOnDaskDataframeVirtualPartition ` * :doc:`PandasOnDaskDataframePartitionManager ` .. toctree:: @@ -42,7 +42,7 @@ specifically for the `PandasOnDask` execution. dataframe partitioning/partition - partitioning/axis_partition + partitioning/virtual_partition partitioning/partition_manager @@ -80,4 +80,4 @@ the user query to execute it on Dask workers. Then, the :py:class:`~modin.core.e that will be written into the file in parallel in Dask workers. .. note:: - Currently, data egress uses default `pandas` implementation for `pandas on Dask` execution. \ No newline at end of file + Currently, data egress uses default `pandas` implementation for `pandas on Dask` execution. diff --git a/docs/flow/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/axis_partition.rst b/docs/flow/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.rst similarity index 78% rename from docs/flow/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/axis_partition.rst rename to docs/flow/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.rst index 9a2159905cf..f4e4131bdab 100644 --- a/docs/flow/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/axis_partition.rst +++ b/docs/flow/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.rst @@ -1,14 +1,14 @@ -PandasOnDaskDataframeAxisPartition -"""""""""""""""""""""""""""""""""" +PandasOnDaskDataframeVirtualPartition +""""""""""""""""""""""""""""""""""""" -The class is the specific implementation of :py:class:`~modin.core.dataframe.pandas.partitioning.virtual_partition.PandasDataframeAxisPartition`, +The class is the specific implementation of :py:class:`~modin.core.dataframe.pandas.partitioning.virtual_partition.PandasOnDaskDataframeVirtualPartition`, providing the API to perform operations on an axis (column or row) partition using Dask as the execution engine. The axis partition is a wrapper over a list of block partitions that are stored in this class. Public API ---------- -.. autoclass:: modin.core.execution.dask.implementations.pandas_on_dask.partitioning.virtual_partition.PandasOnDaskDataframeAxisPartition +.. autoclass:: modin.core.execution.dask.implementations.pandas_on_dask.partitioning.virtual_partition.PandasOnDaskDataframeVirtualPartition :members: PandasOnDaskDataframeColumnPartition diff --git a/docs/release_notes/release_notes-0.16.0.rst b/docs/release_notes/release_notes-0.16.0.rst index d0f2ccb4091..861ac9931b1 100644 --- a/docs/release_notes/release_notes-0.16.0.rst +++ b/docs/release_notes/release_notes-0.16.0.rst @@ -50,6 +50,8 @@ Key Features and Updates * FEAT-#4619: Integrate mypy static type checking (#4620) * New Features * FEAT-4463: Add experimental fuzzydata integration for testing against a randomized dataframe workflow (#4556) + * FEAT-#4419: Extend virtual partitioning API to pandas on Dask (#4420) + Contributors ------------ @@ -62,4 +64,4 @@ Contributors @RehanSD @helmeleegy @anmyachev -@d33bs \ No newline at end of file +@d33bs diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 69379a67d1d..78075fa015e 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -27,7 +27,7 @@ from modin.error_message import ErrorMessage from modin.core.storage_formats.pandas.utils import compute_chunksize from modin.core.dataframe.pandas.utils import concatenate -from modin.config import NPartitions, ProgressBar, BenchmarkMode +from modin.config import NPartitions, ProgressBar, BenchmarkMode, Engine, StorageFormat import os @@ -615,11 +615,15 @@ def concat(cls, axis, left_parts, right_parts): to_concat = ( [left_parts] + right_parts if left_parts.size != 0 else right_parts ) - return ( + result = ( np.concatenate(to_concat, axis=axis) if len(to_concat) else left_parts ) else: - return np.append(left_parts, right_parts, axis=axis) + result = np.append(left_parts, right_parts, axis=axis) + if axis == 0: + return cls.rebalance_partitions(result) + else: + return result @classmethod def to_pandas(cls, partitions): @@ -1292,7 +1296,15 @@ def finalize(cls, partitions): @classmethod def rebalance_partitions(cls, partitions): """ - Return the provided array of partitions without rebalancing it. + Rebalance a 2-d array of partitions if we are using ``PandasOnRay`` or ``PandasOnDask`` executions. + + For all other executions, the partitions are returned unchanged. + + Rebalance the partitions by building a new array + of partitions out of the original ones so that: + + - If all partitions have a length, each new partition has roughly the same number of rows. + - Otherwise, each new partition spans roughly the same number of old partitions. Parameters ---------- @@ -1302,6 +1314,103 @@ def rebalance_partitions(cls, partitions): Returns ------- np.ndarray - The same 2-d array. + A NumPy array with the same; or new, rebalanced, partitions, depending on the execution + engine and storage format. """ + if Engine.get() in ["Ray", "Dask"] and StorageFormat.get() == "Pandas": + # Rebalancing partitions is currently only implemented for PandasOnRay and PandasOnDask. + # We rebalance when the ratio of the number of existing partitions to + # the ideal number of partitions is larger than this threshold. The + # threshold is a heuristic that may need to be tuned for performance. + max_excess_of_num_partitions = 1.5 + num_existing_partitions = partitions.shape[0] + ideal_num_new_partitions = NPartitions.get() + if ( + num_existing_partitions + <= ideal_num_new_partitions * max_excess_of_num_partitions + ): + return partitions + # If any partition has an unknown length, give each axis partition + # roughly the same number of row partitions. We use `_length_cache` here + # to avoid materializing any unmaterialized lengths. + if any( + partition._length_cache is None + for row in partitions + for partition in row + ): + # We need each partition to go into an axis partition, but the + # number of axis partitions may not evenly divide the number of + # partitions. + chunk_size = compute_chunksize( + num_existing_partitions, ideal_num_new_partitions, min_block_size=1 + ) + return np.array( + [ + cls.column_partitions( + partitions[i : i + chunk_size], + full_axis=False, + ) + for i in range( + 0, + num_existing_partitions, + chunk_size, + ) + ] + ) + + # If we know the number of rows in every partition, then we should try + # instead to give each new partition roughly the same number of rows. + new_partitions = [] + # `start` is the index of the first existing partition that we want to + # put into the current new partition. + start = 0 + total_rows = sum(part.length() for part in partitions[:, 0]) + ideal_partition_size = compute_chunksize( + total_rows, ideal_num_new_partitions, min_block_size=1 + ) + for _ in range(ideal_num_new_partitions): + # We might pick up old partitions too quickly and exhaust all of them. + if start >= len(partitions): + break + # `stop` is the index of the last existing partition so far that we + # want to put into the current new partition. + stop = start + partition_size = partitions[start][0].length() + # Add existing partitions into the current new partition until the + # number of rows in the new partition hits `ideal_partition_size`. + while stop < len(partitions) and partition_size < ideal_partition_size: + stop += 1 + if stop < len(partitions): + partition_size += partitions[stop][0].length() + # If the new partition is larger than we want, split the last + # current partition that it contains into two partitions, where + # the first partition has just enough rows to make the current + # new partition have length `ideal_partition_size`, and the second + # partition has the remainder. + if partition_size > ideal_partition_size * max_excess_of_num_partitions: + new_last_partition_size = ideal_partition_size - sum( + row[0].length() for row in partitions[start:stop] + ) + partitions = np.insert( + partitions, + stop + 1, + [ + obj.mask(slice(new_last_partition_size, None), slice(None)) + for obj in partitions[stop] + ], + 0, + ) + partitions[stop, :] = [ + obj.mask(slice(None, new_last_partition_size), slice(None)) + for obj in partitions[stop] + ] + partition_size = ideal_partition_size + new_partitions.append( + cls.column_partitions( + (partitions[start : stop + 1]), + full_axis=partition_size == total_rows, + ) + ) + start = stop + 1 + return np.array(new_partitions) return partitions diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py b/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py index abd87748e4d..7d0ec124229 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py @@ -15,6 +15,7 @@ from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe from ..partitioning.partition_manager import PandasOnDaskDataframePartitionManager +from modin.core.execution.dask.common.engine_wrapper import DaskWrapper class PandasOnDaskDataframe(PandasDataframe): @@ -41,10 +42,49 @@ class PandasOnDaskDataframe(PandasDataframe): _partition_mgr_cls = PandasOnDaskDataframePartitionManager + def _get_partition_size_along_axis(self, partition, axis=0): + """ + Compute the length along the specified axis of the specified partition. + + Parameters + ---------- + partition : ``PandasOnDaskDataframeVirtualPartition`` or ``PandasOnDaskDataframePartition`` + The partition whose size to compute. + axis : int, default: 0 + The axis along which to compute size. + + Returns + ------- + list + A list of lengths along the specified axis that sum to the overall length of the partition + along the specified axis. + + Notes + ----- + This utility function is used to ensure that computation occurs asynchronously across all partitions + whether the partitions are virtual or physical partitions. + """ + if isinstance(partition, self._partition_mgr_cls._partition_class): + return [ + partition.apply( + lambda df: len(df) if not axis else len(df.columns) + )._data + ] + elif partition.axis == axis: + return [ + ptn.apply(lambda df: len(df) if not axis else len(df.columns))._data + for ptn in partition.list_of_partitions_to_combine + ] + return [ + partition.list_of_partitions_to_combine[0] + .apply(lambda df: len(df) if not axis else (len(df.columns))) + ._data + ] + @property def _row_lengths(self): """ - Compute the row partitions lengths if they are not cached. + Compute ther row partitions lengths if they are not cached. Returns ------- @@ -52,11 +92,13 @@ def _row_lengths(self): A list of row partitions lengths. """ if self._row_lengths_cache is None: - self._row_lengths_cache = ( - self._partition_mgr_cls.get_objects_from_partitions( - [obj.apply(lambda df: len(df)) for obj in self._partitions.T[0]] - ) + row_lengths_list = DaskWrapper.materialize( + [ + self._get_partition_size_along_axis(obj, axis=0) + for obj in self._partitions.T[0] + ] ) + self._row_lengths_cache = [sum(len_list) for len_list in row_lengths_list] return self._row_lengths_cache @property @@ -70,12 +112,13 @@ def _column_widths(self): A list of column partitions widths. """ if self._column_widths_cache is None: - self._column_widths_cache = ( - self._partition_mgr_cls.get_objects_from_partitions( - [ - obj.apply(lambda df: len(df.columns)) - for obj in self._partitions[0] - ] - ) + col_widths_list = DaskWrapper.materialize( + [ + self._get_partition_size_along_axis(obj, axis=1) + for obj in self._partitions[0] + ] ) + self._column_widths_cache = [ + sum(width_list) for width_list in col_widths_list + ] return self._column_widths_cache diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py index 7f466a078bd..aa20066bedf 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py @@ -15,6 +15,8 @@ from distributed import Future from distributed.utils import get_ip +from dask.distributed import wait + import pandas from modin.core.dataframe.pandas.partitioning.axis_partition import ( @@ -24,35 +26,120 @@ from modin.core.execution.dask.common.engine_wrapper import DaskWrapper -class PandasOnDaskDataframeAxisPartition(PandasDataframeAxisPartition): +class PandasOnDaskDataframeVirtualPartition(PandasDataframeAxisPartition): """ The class implements the interface in ``PandasDataframeAxisPartition``. Parameters ---------- - list_of_blocks : list - List of ``PandasOnDaskDataframePartition`` objects. + list_of_blocks : Union[list, PandasOnDaskDataframePartition] + List of ``PandasOnDaskDataframePartition`` and + ``PandasOnDaskDataframeVirtualPartition`` objects, or a single + ``PandasOnDaskDataframePartition``. get_ip : bool, default: False Whether to get node IP addresses of conforming partitions or not. full_axis : bool, default: True Whether or not the virtual partition encompasses the whole axis. + call_queue : list, optional + A list of tuples (callable, args, kwargs) that contains deferred calls. """ - def __init__(self, list_of_blocks, get_ip=False, full_axis=True): - if not full_axis: - raise NotImplementedError( - "Pandas on Dask execution requires full-axis partitions." + axis = None + + def __init__(self, list_of_blocks, get_ip=False, full_axis=True, call_queue=None): + if isinstance(list_of_blocks, PandasOnDaskDataframePartition): + list_of_blocks = [list_of_blocks] + self.call_queue = call_queue or [] + self.full_axis = full_axis + # In the simple case, none of the partitions that will compose this + # partition are themselves virtual partition. The partitions that will + # be combined are just the partitions as given to the constructor. + if not any( + isinstance(obj, PandasOnDaskDataframeVirtualPartition) + for obj in list_of_blocks + ): + self.list_of_partitions_to_combine = list_of_blocks + return + # Check that all axis are the same in `list_of_blocks` + # We should never have mismatching axis in the current implementation. We add this + # defensive assertion to ensure that undefined behavior does not happen. + assert ( + len( + set( + obj.axis + for obj in list_of_blocks + if isinstance(obj, PandasOnDaskDataframeVirtualPartition) + ) ) - for obj in list_of_blocks: - obj.drain_call_queue() - # Unwrap from PandasDataframePartition object for ease of use - self.list_of_blocks = [obj._data for obj in list_of_blocks] - if get_ip: - self.list_of_ips = [obj._ip_cache for obj in list_of_blocks] + == 1 + ) + # When the axis of all virtual partitions matches this axis, + # extend and combine the lists of physical partitions. + if ( + next( + obj + for obj in list_of_blocks + if isinstance(obj, PandasOnDaskDataframeVirtualPartition) + ).axis + == self.axis + ): + new_list_of_blocks = [] + for obj in list_of_blocks: + new_list_of_blocks.extend( + obj.list_of_partitions_to_combine + ) if isinstance( + obj, PandasOnDaskDataframeVirtualPartition + ) else new_list_of_blocks.append( + obj + ) + self.list_of_partitions_to_combine = new_list_of_blocks + # Materialize partitions if the axis of this virtual does not match the virtual partitions + else: + self.list_of_partitions_to_combine = [ + obj.force_materialization().list_of_partitions_to_combine[0] + if isinstance(obj, PandasOnDaskDataframeVirtualPartition) + else obj + for obj in list_of_blocks + ] partition_type = PandasOnDaskDataframePartition instance_type = Future + @property + def list_of_blocks(self): + """ + Get the list of physical partition objects that compose this partition. + + Returns + ------- + List + A list of ``distributed.Future``. + """ + # Defer draining call queue until we get the partitions + # TODO Look into draining call queue at the same time as the task + result = [None] * len(self.list_of_partitions_to_combine) + for idx, partition in enumerate(self.list_of_partitions_to_combine): + partition.drain_call_queue() + result[idx] = partition._data + return result + + @property + def list_of_ips(self): + """ + Get the IPs holding the physical objects composing this partition. + + Returns + ------- + List + A list of IPs as ``distributed.Future`` or str. + """ + # Defer draining call queue until we get the ip address + result = [None] * len(self.list_of_partitions_to_combine) + for idx, partition in enumerate(self.list_of_partitions_to_combine): + partition.drain_call_queue() + result[idx] = partition._ip_cache + return result + @classmethod def deploy_axis_func( cls, @@ -165,8 +252,209 @@ def _wrap_partitions(self, partitions): for (future, length, width, ip) in zip(*[iter(partitions)] * 4) ] + def apply( + self, + func, + *args, + num_splits=None, + other_axis_partition=None, + maintain_partitioning=True, + **kwargs, + ): + """ + Apply a function to this axis partition along full axis. + + Parameters + ---------- + func : callable + The function to apply. + *args : iterable + Additional positional arguments to be passed in `func`. + num_splits : int, default: None + The number of times to split the result object. + other_axis_partition : PandasDataframeAxisPartition, default: None + Another `PandasDataframeAxisPartition` object to be applied + to func. This is for operations that are between two data sets. + maintain_partitioning : bool, default: True + Whether to keep the partitioning in the same + orientation as it was previously or not. This is important because we may be + operating on an individual AxisPartition and not touching the rest. + In this case, we have to return the partitioning to its previous + orientation (the lengths will remain the same). This is ignored between + two axis partitions. + **kwargs : dict + Additional keywords arguments to be passed in `func`. + + Returns + ------- + list + A list of `PandasOnDaskDataframeVirtualPartition` objects. + """ + if not self.full_axis: + # If this is not a full axis partition, it already contains a subset of + # the full axis, so we shouldn't split the result further. + num_splits = 1 + if len(self.call_queue) > 0: + self.drain_call_queue() + kwargs["args"] = args + result = super(PandasOnDaskDataframeVirtualPartition, self).apply( + func, num_splits, other_axis_partition, maintain_partitioning, **kwargs + ) + if self.full_axis: + return result + else: + # If this is a full axis partition, just take out the single split in the result. + return result[0] + + def force_materialization(self, get_ip=False): + """ + Materialize partitions into a single partition. + + Parameters + ---------- + get_ip : bool, default: False + Whether to get node ip address to a single partition or not. + + Returns + ------- + PandasOnDaskDataframeVirtualPartition + An axis partition containing only a single materialized partition. + """ + materialized = super( + PandasOnDaskDataframeVirtualPartition, self + ).force_materialization(get_ip=get_ip) + self.list_of_partitions_to_combine = materialized.list_of_partitions_to_combine + return materialized + + def mask(self, row_indices, col_indices): + """ + Create (synchronously) a mask that extracts the indices provided. + + Parameters + ---------- + row_indices : list-like, slice or label + The row labels for the rows to extract. + col_indices : list-like, slice or label + The column labels for the columns to extract. + + Returns + ------- + PandasOnDaskDataframeVirtualPartition + A new ``PandasOnDaskDataframeVirtualPartition`` object, + materialized. + """ + return ( + self.force_materialization() + .list_of_partitions_to_combine[0] + .mask(row_indices, col_indices) + ) + + def to_pandas(self): + """ + Convert the data in this partition to a ``pandas.DataFrame``. + + Returns + ------- + pandas DataFrame. + """ + return self.force_materialization().list_of_partitions_to_combine[0].to_pandas() + + _length_cache = None + + def length(self): + """ + Get the length of this partition. + + Returns + ------- + int + The length of the partition. + """ + if self._length_cache is None: + if self.axis == 0: + self._length_cache = sum( + obj.length() for obj in self.list_of_partitions_to_combine + ) + else: + self._length_cache = self.list_of_partitions_to_combine[0].length() + return self._length_cache + + _width_cache = None + + def width(self): + """ + Get the width of this partition. + + Returns + ------- + int + The width of the partition. + """ + if self._width_cache is None: + if self.axis == 1: + self._width_cache = sum( + obj.width() for obj in self.list_of_partitions_to_combine + ) + else: + self._width_cache = self.list_of_partitions_to_combine[0].width() + return self._width_cache + + def drain_call_queue(self, num_splits=None): + """ + Execute all operations stored in this partition's call queue. -class PandasOnDaskDataframeColumnPartition(PandasOnDaskDataframeAxisPartition): + Parameters + ---------- + num_splits : int, default: None + The number of times to split the result object. + """ + + def drain(df): + for func, args, kwargs in self.call_queue: + df = func(df, *args, **kwargs) + return df + + drained = super(PandasOnDaskDataframeVirtualPartition, self).apply( + drain, num_splits=num_splits + ) + self.list_of_partitions_to_combine = drained + self.call_queue = [] + + def wait(self): + """Wait completing computations on the object wrapped by the partition.""" + self.drain_call_queue() + wait(self.list_of_blocks) + + def add_to_apply_calls(self, func, *args, **kwargs): + """ + Add a function to the call queue. + + Parameters + ---------- + func : callable + Function to be added to the call queue. + *args : iterable + Additional positional arguments to be passed in `func`. + **kwargs : dict + Additional keyword arguments to be passed in `func`. + + Returns + ------- + PandasOnDaskDataframeVirtualPartition + A new ``PandasOnDaskDataframeVirtualPartition`` object. + + Notes + ----- + The keyword arguments are sent as a dictionary. + """ + return type(self)( + self.list_of_partitions_to_combine, + full_axis=self.full_axis, + call_queue=self.call_queue + [(func, args, kwargs)], + ) + + +class PandasOnDaskDataframeColumnPartition(PandasOnDaskDataframeVirtualPartition): """ The column partition implementation. @@ -180,13 +468,15 @@ class PandasOnDaskDataframeColumnPartition(PandasOnDaskDataframeAxisPartition): get_ip : bool, default: False Whether to get node IP addresses to conforming partitions or not. full_axis : bool, default: True - Whether or not the virtual partition encompasses the whole axis. + Whether this partition spans an entire axis of the dataframe. + call_queue : list, optional + A list of tuples (callable, args, kwargs) that contains deferred calls. """ axis = 0 -class PandasOnDaskDataframeRowPartition(PandasOnDaskDataframeAxisPartition): +class PandasOnDaskDataframeRowPartition(PandasOnDaskDataframeVirtualPartition): """ The row partition implementation. @@ -200,7 +490,9 @@ class PandasOnDaskDataframeRowPartition(PandasOnDaskDataframeAxisPartition): get_ip : bool, default: False Whether to get node IP addresses to conforming partitions or not. full_axis : bool, default: True - Whether or not the virtual partition encompasses the whole axis. + Whether this partition spans an entire axis of the dataframe. + call_queue : list, optional + A list of tuples (callable, args, kwargs) that contains deferred calls. """ axis = 1 diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py index f525d2ada7b..e853fedfe44 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py @@ -16,10 +16,9 @@ import inspect import threading -import numpy as np import ray -from modin.config import ProgressBar, NPartitions +from modin.config import ProgressBar from modin.core.execution.ray.generic.partitioning import ( GenericRayDataframePartitionManager, ) @@ -29,7 +28,6 @@ ) from .partition import PandasOnRayDataframePartition from modin.core.execution.ray.generic.modin_aqp import call_progress_bar -from modin.core.storage_formats.pandas.utils import compute_chunksize from pandas._libs.lib import no_default @@ -111,153 +109,6 @@ def get_objects_from_partitions(cls, partitions): """ return ray.get([partition._data for partition in partitions]) - @classmethod - def concat(cls, axis, left_parts, right_parts): - """ - Concatenate the blocks of partitions with another set of blocks. - - Parameters - ---------- - axis : int - The axis to concatenate to. - left_parts : np.ndarray - NumPy array of partitions to concatenate with. - right_parts : np.ndarray or list - NumPy array of partitions to be concatenated. - - Returns - ------- - np.ndarray - A new NumPy array with concatenated partitions. - - Notes - ----- - Assumes that the `left_parts` and `right_parts` blocks are already the same - shape on the dimension (opposite `axis`) as the one being concatenated. A - ``ValueError`` will be thrown if this condition is not met. - """ - result = super(PandasOnRayDataframePartitionManager, cls).concat( - axis, left_parts, right_parts - ) - if axis == 0: - return cls.rebalance_partitions(result) - else: - return result - - @classmethod - def rebalance_partitions(cls, partitions): - """ - Rebalance a 2-d array of partitions. - - Rebalance the partitions by building a new array - of partitions out of the original ones so that: - - - If all partitions have a length, each new partition has roughly the same number of rows. - - Otherwise, each new partition spans roughly the same number of old partitions. - - Parameters - ---------- - partitions : np.ndarray - The 2-d array of partitions to rebalance. - - Returns - ------- - np.ndarray - A new NumPy array with rebalanced partitions. - """ - # We rebalance when the ratio of the number of existing partitions to - # the ideal number of partitions is larger than this threshold. The - # threshold is a heuristic that may need to be tuned for performance. - max_excess_of_num_partitions = 1.5 - num_existing_partitions = partitions.shape[0] - ideal_num_new_partitions = NPartitions.get() - if ( - num_existing_partitions - <= ideal_num_new_partitions * max_excess_of_num_partitions - ): - return partitions - # If any partition has an unknown length, give each axis partition - # roughly the same number of row partitions. We use `_length_cache` here - # to avoid materializing any unmaterialized lengths. - if any( - partition._length_cache is None for row in partitions for partition in row - ): - # We need each partition to go into an axis partition, but the - # number of axis partitions may not evenly divide the number of - # partitions. - chunk_size = compute_chunksize( - num_existing_partitions, ideal_num_new_partitions, min_block_size=1 - ) - return np.array( - [ - cls.column_partitions( - partitions[i : i + chunk_size], - full_axis=False, - ) - for i in range( - 0, - num_existing_partitions, - chunk_size, - ) - ] - ) - - # If we know the number of rows in every partition, then we should try - # instead to give each new partition roughly the same number of rows. - new_partitions = [] - # `start` is the index of the first existing partition that we want to - # put into the current new partition. - start = 0 - total_rows = sum(part.length() for part in partitions[:, 0]) - ideal_partition_size = compute_chunksize( - total_rows, ideal_num_new_partitions, min_block_size=1 - ) - for _ in range(ideal_num_new_partitions): - # We might pick up old partitions too quickly and exhaust all of them. - if start >= len(partitions): - break - # `stop` is the index of the last existing partition so far that we - # want to put into the current new partition. - stop = start - partition_size = partitions[start][0].length() - # Add existing partitions into the current new partition until the - # number of rows in the new partition hits `ideal_partition_size`. - while stop < len(partitions) and partition_size < ideal_partition_size: - stop += 1 - if stop < len(partitions): - partition_size += partitions[stop][0].length() - # If the new partition is larger than we want, split the last - # current partition that it contains into two partitions, where - # the first partition has just enough rows to make the current - # new partition have length `ideal_partition_size`, and the second - # partition has the remainder. - if partition_size > ideal_partition_size * max_excess_of_num_partitions: - new_last_partition_size = ideal_partition_size - sum( - row[0].length() for row in partitions[start:stop] - ) - partitions = np.insert( - partitions, - stop + 1, - [ - obj.mask(slice(new_last_partition_size, None), slice(None)) - for obj in partitions[stop] - ], - 0, - ) - partitions[stop, :] = [ - obj.mask(slice(None, new_last_partition_size), slice(None)) - for obj in partitions[stop] - ] - partition_size = ideal_partition_size - new_partitions.append( - cls.column_partitions( - (partitions[start : stop + 1]), - full_axis=partition_size == total_rows, - ) - ) - start = stop + 1 - return np.array(new_partitions) - @classmethod @progress_bar_wrapper def map_partitions(cls, partitions, map_func): diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index 03ce2244638..421559133ce 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -55,7 +55,8 @@ def __init__(self, list_of_blocks, get_ip=False, full_axis=True, call_queue=None # partition are themselves virtual partition. The partitions that will # be combined are just the partitions as given to the constructor. if not any( - isinstance(o, PandasOnRayDataframeVirtualPartition) for o in list_of_blocks + isinstance(obj, PandasOnRayDataframeVirtualPartition) + for obj in list_of_blocks ): self.list_of_partitions_to_combine = list_of_blocks return @@ -65,9 +66,9 @@ def __init__(self, list_of_blocks, get_ip=False, full_axis=True, call_queue=None assert ( len( set( - o.axis - for o in list_of_blocks - if isinstance(o, PandasOnRayDataframeVirtualPartition) + obj.axis + for obj in list_of_blocks + if isinstance(obj, PandasOnRayDataframeVirtualPartition) ) ) == 1 @@ -76,20 +77,20 @@ def __init__(self, list_of_blocks, get_ip=False, full_axis=True, call_queue=None # extend and combine the lists of physical partitions. if ( next( - o - for o in list_of_blocks - if isinstance(o, PandasOnRayDataframeVirtualPartition) + obj + for obj in list_of_blocks + if isinstance(obj, PandasOnRayDataframeVirtualPartition) ).axis == self.axis ): new_list_of_blocks = [] - for o in list_of_blocks: + for obj in list_of_blocks: new_list_of_blocks.extend( - o.list_of_partitions_to_combine + obj.list_of_partitions_to_combine ) if isinstance( - o, PandasOnRayDataframeVirtualPartition + obj, PandasOnRayDataframeVirtualPartition ) else new_list_of_blocks.append( - o + obj ) self.list_of_partitions_to_combine = new_list_of_blocks # Materialize partitions if the axis of this virtual does not match the virtual partitions @@ -377,7 +378,7 @@ def length(self): if self._length_cache is None: if self.axis == 0: self._length_cache = sum( - o.length() for o in self.list_of_partitions_to_combine + obj.length() for obj in self.list_of_partitions_to_combine ) else: self._length_cache = self.list_of_partitions_to_combine[0].length() @@ -397,7 +398,7 @@ def width(self): if self._width_cache is None: if self.axis == 1: self._width_cache = sum( - o.width() for o in self.list_of_partitions_to_combine + obj.width() for obj in self.list_of_partitions_to_combine ) else: self._width_cache = self.list_of_partitions_to_combine[0].width() diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 83dc252cde2..4d530d83066 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -2012,8 +2012,8 @@ def test_groupby_with_virtual_partitions(): big_pandas_df = pandas.concat([pandas_df for _ in range(5)]) # Check that the constructed Modin DataFrame has virtual partitions when - # using Ray, and doesn't when using another execution engines. - if Engine.get() == "Ray": + # using Ray or Dask, and doesn't when using another execution engines. + if Engine.get() in ["Ray", "Dask"]: assert issubclass( type(big_modin_df._query_compiler._modin_frame._partitions[0][0]), PandasDataframeAxisPartition, diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index 523be741363..3f8d7e9ed26 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -13,7 +13,7 @@ import modin.pandas as pd from modin.pandas.test.utils import create_test_dfs, test_data_values, df_equals -from modin.config import NPartitions +from modin.config import NPartitions, Engine import pytest @@ -110,3 +110,102 @@ def func_to_apply(partition, row_internal_indices, col_internal_indices, item): md_df._query_compiler._modin_frame = new_modin_frame df_equals(md_df, pd_df) + + +@pytest.mark.skipif( + Engine.get() not in ("Dask", "Ray"), + reason="Rebalancing partitions is only supported for Dask and Ray engines", +) +@pytest.mark.parametrize( + "test_type", + [ + "many_small_dfs", + "concatted_df_with_small_dfs", + "large_df_plus_small_dfs", + ], +) +def test_rebalance_partitions(test_type): + if test_type == "many_small_dfs": + small_dfs = [ + pd.DataFrame( + [[i + j for j in range(0, 1000)]], + columns=[f"col{j}" for j in range(0, 1000)], + index=pd.Index([i]), + ) + for i in range(1, 100001, 1000) + ] + large_df = pd.concat(small_dfs) + col_length = 100 + elif test_type == "concatted_df_with_small_dfs": + small_dfs = [ + pd.DataFrame( + [[i + j for j in range(0, 1000)]], + columns=[f"col{j}" for j in range(0, 1000)], + index=pd.Index([i]), + ) + for i in range(1, 100001, 1000) + ] + large_df = pd.concat([pd.concat(small_dfs)] + small_dfs[:3]) + col_length = 103 + else: + large_df = pd.DataFrame( + [[i + j for j in range(1, 1000)] for i in range(0, 100000, 1000)], + columns=[f"col{j}" for j in range(1, 1000)], + index=pd.Index(list(range(0, 100000, 1000))), + ) + small_dfs = [ + pd.DataFrame( + [[i + j for j in range(0, 1000)]], + columns=[f"col{j}" for j in range(0, 1000)], + index=pd.Index([i]), + ) + for i in range(1, 4001, 1000) + ] + large_df = pd.concat([large_df] + small_dfs[:3]) + col_length = 103 + large_modin_frame = large_df._query_compiler._modin_frame + assert large_modin_frame._partitions.shape == ( + NPartitions.get(), + NPartitions.get(), + ), "Partitions were not rebalanced after concat." + assert all( + isinstance(ptn, large_modin_frame._partition_mgr_cls._column_partitions_class) + for ptn in large_modin_frame._partitions.flatten() + ) + # The following check tests that we can correctly form full-axis virtual partitions + # over the orthogonal axis from non-full-axis virtual partitions. + + def col_apply_func(col): + assert len(col) == col_length, "Partial axis partition detected." + return col + 1 + + large_df = large_df.apply(col_apply_func) + new_large_modin_frame = large_df._query_compiler._modin_frame + assert new_large_modin_frame._partitions.shape == ( + NPartitions.get(), + NPartitions.get(), + ), "Partitions list shape is incorrect." + assert all( + isinstance(ptn, new_large_modin_frame._partition_mgr_cls._partition_class) + for ptn in new_large_modin_frame._partitions.flatten() + ), "Partitions are not block partitioned after apply." + large_df = pd.DataFrame( + query_compiler=large_df._query_compiler.__constructor__(large_modin_frame) + ) + # The following check tests that we can correctly form full-axis virtual partitions + # over the same axis from non-full-axis virtual partitions. + + def row_apply_func(row): + assert len(row) == 1000, "Partial axis partition detected." + return row + 1 + + large_df = large_df.apply(row_apply_func, axis=1) + new_large_modin_frame = large_df._query_compiler._modin_frame + assert new_large_modin_frame._partitions.shape == ( + 4, + 4, + ), "Partitions list shape is incorrect." + assert all( + isinstance(ptn, new_large_modin_frame._partition_mgr_cls._partition_class) + for ptn in new_large_modin_frame._partitions.flatten() + ), "Partitions are not block partitioned after apply."