Skip to content

Commit

Permalink
FIX-modin-project#2322: add aligning partition' blocks
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev committed Nov 6, 2020
1 parent fc34852 commit 040766d
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 15 deletions.
9 changes: 9 additions & 0 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,8 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
"""
Copartition two dataframes.
Perform aligning of partitions, index and partition blocks.
Parameters
----------
axis : 0 or 1
Expand All @@ -1672,6 +1674,7 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):

is_aligning_applied = False
for i in range(len(other)):
# aligning partitions
if (
len(self._partitions) != len(other[i]._partitions)
and len(self.axes[0]) == len(other[i].axes[0])
Expand All @@ -1689,11 +1692,13 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
all(o.axes[axis].equals(self.axes[axis]) for o in other)
and not is_aligning_applied
):
# aligning self.list_of_blocks
return (
self._partitions,
[self._simple_shuffle(axis, o) for o in other],
self.axes[axis].copy(),
)

index_other_obj = [o.axes[axis] for o in other]
joined_index = self._join_index_objects(axis, index_other_obj, how, sort)
# We have to set these because otherwise when we perform the functions it may
Expand All @@ -1708,6 +1713,7 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
and not is_avoid_reindex
and (force_repartition or not left_old_idx.equals(joined_index))
):
# aligning index without aligning partition' blocks
reindexed_self = self._frame_mgr_cls.map_axis_partitions(
axis, self._partitions, lambda df: df.reindex(joined_index, axis=axis)
)
Expand All @@ -1723,10 +1729,13 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
):
reindexed_other = other[i]._partitions
else:
# aligning index with aligning partition' blocks
reindexed_other = other[i]._frame_mgr_cls.map_axis_partitions(
axis,
other[i]._partitions,
lambda df: df.reindex(joined_index, axis=axis),
lengths=self._row_lengths if axis == 0 else self._column_widths,
manual_partition=True,
)
reindexed_other_list.append(reindexed_other)
return reindexed_self, reindexed_other_list, joined_index
Expand Down
44 changes: 29 additions & 15 deletions modin/engines/base/frame/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,22 +214,27 @@ def broadcast_axis_partitions(
left,
right,
keep_partitioning=False,
lengths=None,
manual_partition=None,
):
"""
Broadcast the right partitions to left and apply a function along full axis.
Parameters
----------
axis : The axis to apply and broadcast over.
apply_func : The function to apply.
left : The left partitions.
right : The right partitions.
keep_partitioning : boolean. Default is False
The flag to keep partitions for Modin Frame.
axis : The axis to apply and broadcast over.
apply_func : The function to apply.
left : The left partitions.
right : The right partitions.
keep_partitioning : boolean. Default is False
The flag to keep partitions for Modin Frame.
lengths : bool
The list of lengths to shuffle the object.
manual_partition : list(int)
Returns
-------
A new `np.array` of partition objects.
A new `np.array` of partition objects.
"""
# Since we are already splitting the DataFrame back up after an
# operation, we will just use this time to compute the number of
Expand All @@ -251,6 +256,8 @@ def broadcast_axis_partitions(
preprocessed_map_func,
num_splits=num_splits,
other_axis_partition=right_partitions,
_lengths=lengths,
manual_partition=manual_partition,
)
for part in left_partitions
]
Expand Down Expand Up @@ -295,20 +302,25 @@ def map_axis_partitions(
partitions,
map_func,
keep_partitioning=False,
lengths=None,
manual_partition=None,
):
"""
Applies `map_func` to every partition.
Parameters
----------
axis : 0 or 1
The axis to perform the map across (0 - index, 1 - columns).
partitions : NumPy array
The partitions of Modin Frame.
map_func : callable
The function to apply.
keep_partitioning : boolean. Default is False
The flag to keep partitions for Modin Frame.
axis : 0 or 1
The axis to perform the map across (0 - index, 1 - columns).
partitions : NumPy array
The partitions of Modin Frame.
map_func : callable
The function to apply.
keep_partitioning : bool. Default is False
The flag to keep partitions for Modin Frame.
lengths : list(int)
The list of lengths to shuffle the object.
manual_partition : bool
Returns
-------
Expand All @@ -326,6 +338,8 @@ def map_axis_partitions(
apply_func=map_func,
keep_partitioning=keep_partitioning,
right=None,
lengths=lengths,
manual_partition=manual_partition,
)

@classmethod
Expand Down

0 comments on commit 040766d

Please sign in to comment.