Skip to content

Commit

Permalink
FIX-modin-project#2322: fix case with duplicated index
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev committed Nov 6, 2020
1 parent 6a00aef commit 504fd91
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 30 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ jobs:
run: python -m pytest modin/config/test
- shell: bash -l {0}
run: python -m pytest modin/test/test_envvar_catcher.py
- shell: bash -l {0}
run: python -m pytest modin/test/test_internals.py

test-defaults:
needs: [lint-commit, lint-flake8, lint-black, test-api, test-headers]
Expand Down
30 changes: 16 additions & 14 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1707,20 +1707,23 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
left_old_idx = self.axes[axis]
right_old_idxes = index_other_obj

is_avoid_reindex = len(joined_index) != len(joined_index.unique()) and axis == 0
def make_map_func():
if len(joined_index) != len(joined_index.unique()) and axis == 0:
return lambda df: df
return lambda df: df.reindex(joined_index, axis=axis)

# Start with this and we'll repartition the first time, and then not again.
if (
not is_aligning_applied
and not is_avoid_reindex
and (force_repartition or not left_old_idx.equals(joined_index))
if is_aligning_applied or (
not force_repartition and left_old_idx.equals(joined_index)
):
reindexed_self = self._partitions
else:
# aligning index without aligning partition' blocks
reindexed_self = self._frame_mgr_cls.map_axis_partitions(
axis, self._partitions, lambda df: df.reindex(joined_index, axis=axis)
axis,
self._partitions,
make_map_func(),
)
else:
reindexed_self = self._partitions
reindexed_other_list = []

def get_column_widths(partitions):
if len(partitions) > 0:
Expand All @@ -1730,19 +1733,18 @@ def get_row_lengths(partitions):
if len(partitions.T) > 0:
return [obj.length() for obj in partitions.T[0]]

reindexed_other_list = []
for i in range(len(other)):
if (
is_aligning_applied
or is_avoid_reindex
or (not force_repartition and right_old_idxes[i].equals(joined_index))
if is_aligning_applied or (
not force_repartition and right_old_idxes[i].equals(joined_index)
):
reindexed_other = other[i]._partitions
else:
# aligning index with aligning partition' blocks
reindexed_other = other[i]._frame_mgr_cls.map_axis_partitions(
axis,
other[i]._partitions,
lambda df: df.reindex(joined_index, axis=axis),
make_map_func(),
lengths=get_row_lengths(reindexed_self)
if axis == 0
else get_column_widths(reindexed_self),
Expand Down
8 changes: 2 additions & 6 deletions modin/engines/base/frame/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,8 @@ def broadcast_axis_partitions(
"other_axis_partition": right_partitions,
}
if lengths:
kw.update(
{
"_lengths": lengths,
"manual_partition": True,
}
)
kw["_lengths"] = lengths
kw["manual_partition"] = True

result_blocks = np.array(
[
Expand Down
10 changes: 0 additions & 10 deletions modin/pandas/test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1897,16 +1897,6 @@ def test_to_gbq():
modin_df.to_gbq("modin.table")


def test_internal_error_2322():
accm = pd.DataFrame(["-22\n"] * 162)
accm = accm.iloc[2:, :]
accm.reset_index(drop=True, inplace=True)
accm["T"] = pd.Series(["24.67\n"] * 145)

# see #2322 for details
repr(accm)


def test_cleanup():
filenames = [
TEST_PARQUET_FILENAME,
Expand Down
40 changes: 40 additions & 0 deletions modin/test/test_internals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import modin.pandas as pd


def test_aligning_blocks():
# Test problem when modin frames have the same number of rows, but different
# blocks (partition.list_of_blocks). See #2322 for details
accm = pd.DataFrame(["-22\n"] * 162)
accm = accm.iloc[2:, :]
accm.reset_index(drop=True, inplace=True)
accm["T"] = pd.Series(["24.67\n"] * 145)

# see #2322 for details
repr(accm)


def test_aligning_blocks_with_duplicated_index():
# Same problem as in `test_aligning_blocks` but with duplicated values in index.
data11 = [0, 1]
data12 = [2, 3]

data21 = [0]
data22 = [1, 2, 3]

df1 = pd.DataFrame(data11).append(pd.DataFrame(data12))
df2 = pd.DataFrame(data21).append(pd.DataFrame(data22))

repr(df1 - df2)

0 comments on commit 504fd91

Please sign in to comment.