Skip to content

Commit

Permalink
FIX-modin-project#2322: fix getting lengths and widths of partitions
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev committed Nov 6, 2020
1 parent bfa24c2 commit d0fb8ef
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
14 changes: 12 additions & 2 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1681,6 +1681,7 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
and axis == 0
):
is_aligning_applied = True
# we shouldn't modificate modin frame' partitions directly
self._partitions = self._frame_mgr_cls.map_axis_partitions(
axis, self._partitions, lambda df: df
)
Expand Down Expand Up @@ -1721,6 +1722,14 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
reindexed_self = self._partitions
reindexed_other_list = []

def get_column_widths(partitions):
if len(partitions) > 0:
return [obj.width() for obj in partitions[0]]

def get_row_lengths(partitions):
if len(partitions.T) > 0:
return [obj.length() for obj in partitions.T[0]]

for i in range(len(other)):
if (
is_aligning_applied
Expand All @@ -1734,8 +1743,9 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
axis,
other[i]._partitions,
lambda df: df.reindex(joined_index, axis=axis),
lengths=self._row_lengths if axis == 0 else self._column_widths,
manual_partition=True,
lengths=get_row_lengths(reindexed_self)
if axis == 0
else get_column_widths(reindexed_self),
)
reindexed_other_list.append(reindexed_other)
return reindexed_self, reindexed_other_list, joined_index
Expand Down
24 changes: 14 additions & 10 deletions modin/engines/base/frame/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ def broadcast_axis_partitions(
right,
keep_partitioning=False,
lengths=None,
manual_partition=None,
):
"""
Broadcast the right partitions to left and apply a function along full axis.
Expand All @@ -228,9 +227,8 @@ def broadcast_axis_partitions(
right : The right partitions.
keep_partitioning : boolean. Default is False
The flag to keep partitions for Modin Frame.
lengths : bool
lengths : list(int)
The list of lengths to shuffle the object.
manual_partition : list(int)
Returns
-------
Expand All @@ -250,14 +248,23 @@ def broadcast_axis_partitions(
# may want to line to partitioning up with another BlockPartitions object. Since
# we don't need to maintain the partitioning, this gives us the opportunity to
# load-balance the data as well.
kw = {
"num_splits": num_splits,
"other_axis_partition": right_partitions,
}
if lengths:
kw.update(
{
"_lengths": lengths,
"manual_partition": True,
}
)

result_blocks = np.array(
[
part.apply(
preprocessed_map_func,
num_splits=num_splits,
other_axis_partition=right_partitions,
_lengths=lengths,
manual_partition=manual_partition,
**kw,
)
for part in left_partitions
]
Expand Down Expand Up @@ -303,7 +310,6 @@ def map_axis_partitions(
map_func,
keep_partitioning=False,
lengths=None,
manual_partition=None,
):
"""
Applies `map_func` to every partition.
Expand All @@ -320,7 +326,6 @@ def map_axis_partitions(
The flag to keep partitions for Modin Frame.
lengths : list(int)
The list of lengths to shuffle the object.
manual_partition : bool
Returns
-------
Expand All @@ -339,7 +344,6 @@ def map_axis_partitions(
keep_partitioning=keep_partitioning,
right=None,
lengths=lengths,
manual_partition=manual_partition,
)

@classmethod
Expand Down

0 comments on commit d0fb8ef

Please sign in to comment.