Skip to content

Commit

Permalink
FIX-modin-project#2374: remove extra code; add pandas way to handle d…
Browse files Browse the repository at this point in the history
…uplicate values in reindex func for binary operations (modin-project#2378)

Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev committed Dec 4, 2020
1 parent 7458746 commit c2e7f9e
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 98 deletions.
47 changes: 47 additions & 0 deletions asv_bench/benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
JOIN_DATA_SIZE = MERGE_DATA_SIZE
ARITHMETIC_DATA_SIZE = GROUPBY_DATA_SIZE

CONCAT_DATA_SIZE = [(10_128, 100, 10_000, 128)]


class TimeGroupBy:
param_names = ["impl", "data_type", "data_size"]
Expand Down Expand Up @@ -111,6 +113,51 @@ def time_merge(self, impl, data_type, data_size, how, sort):
self.df1.merge(self.df2, on=self.df1.columns[0], how=how, sort=sort)


class TimeConcat:
param_names = ["data_type", "data_size", "how", "axis"]
params = [
["int"],
CONCAT_DATA_SIZE,
["inner"],
[0, 1],
]

def setup(self, data_type, data_size, how, axis):
# shape for generate_dataframe: first - ncols, second - nrows
self.df1 = generate_dataframe(
"modin", data_type, data_size[1], data_size[0], RAND_LOW, RAND_HIGH
)
self.df2 = generate_dataframe(
"modin", data_type, data_size[3], data_size[2], RAND_LOW, RAND_HIGH
)

def time_concat(self, data_type, data_size, how, axis):
pd.concat([self.df1, self.df2], axis=axis, join=how)


class TimeBinaryOp:
param_names = ["data_type", "data_size", "binary_op", "axis"]
params = [
["int"],
CONCAT_DATA_SIZE,
["mul"],
[0, 1],
]

def setup(self, data_type, data_size, binary_op, axis):
# shape for generate_dataframe: first - ncols, second - nrows
self.df1 = generate_dataframe(
"modin", data_type, data_size[1], data_size[0], RAND_LOW, RAND_HIGH
)
self.df2 = generate_dataframe(
"modin", data_type, data_size[3], data_size[2], RAND_LOW, RAND_HIGH
)
self.op = getattr(self.df1, binary_op)

def time_concat(self, data_type, data_size, binary_op, axis):
self.op(self.df2, axis=axis)


class TimeArithmetic:
param_names = ["impl", "data_type", "data_size", "axis"]
params = [
Expand Down
221 changes: 124 additions & 97 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,8 @@ def internal(block_idx, global_index):
]
return OrderedDict(partition_ids_with_indices)

def _join_index_objects(self, axis, other_index, how, sort):
@staticmethod
def _join_index_objects(axis, indexes, how, sort):
"""
Join the pair of index objects (columns or rows) by a given strategy.
Expand All @@ -976,37 +977,80 @@ def _join_index_objects(self, axis, other_index, how, sort):
Parameters
----------
axis : 0 or 1
The axis index object to join (0 - rows, 1 - columns).
other_index : Index
The other_index to join on.
how : {'left', 'right', 'inner', 'outer'}
The type of join to join to make.
sort : boolean
Whether or not to sort the joined index
axis : 0 or 1
The axis index object to join (0 - rows, 1 - columns).
indexes : list(Index)
The indexes to join on.
how : {'left', 'right', 'inner', 'outer'}
The type of join to join to make.
sort : boolean
Whether or not to sort the joined index
Returns
-------
Index
Joined indices.
(Index, func)
Joined index with make_reindexer func
"""
assert isinstance(indexes, list)

def merge_index(obj1, obj2):
# define helper functions
def merge(left_index, right_index):
if axis == 1 and how == "outer" and not sort:
return obj1.union(obj2, sort=False)
return left_index.union(right_index, sort=False)
else:
return obj1.join(obj2, how=how, sort=sort)

if isinstance(other_index, list):
joined_obj = self.columns if axis else self.index
# TODO: revisit for performance
for obj in other_index:
joined_obj = merge_index(joined_obj, obj)
return joined_obj
if axis:
return merge_index(self.columns, other_index)
return left_index.join(right_index, how=how, sort=sort)

# define condition for joining indexes
do_join_index = False
for index in indexes[1:]:
if not indexes[0].equals(index):
do_join_index = True
break

# define condition for joining indexes with getting indexers
is_duplicates = any(not index.is_unique for index in indexes) and axis == 0
indexers = []
if is_duplicates:
indexers = [None] * len(indexes)

# perform joining indexes
if do_join_index:
if len(indexes) == 2 and is_duplicates:
# in case of count of indexes > 2 we should perform joining all indexes
# after that get indexers
# in the fast path we can obtain joined_index and indexers in one call
joined_index, indexers[0], indexers[1] = indexes[0].join(
indexes[1], how=how, sort=sort, return_indexers=True
)
else:
joined_index = indexes[0]
# TODO: revisit for performance
for index in indexes[1:]:
joined_index = merge(joined_index, index)

if is_duplicates:
for i, index in enumerate(indexes):
indexers[i] = index.get_indexer_for(joined_index)
else:
return self.index.join(other_index, how=how, sort=sort)
joined_index = indexes[0].copy()

def make_reindexer(do_reindex: bool, frame_idx: int):
# the order of the frames must match the order of the indexes
if not do_reindex:
return lambda df: df

if is_duplicates:
assert indexers != []

return lambda df: df._reindex_with_indexers(
{0: [joined_index, indexers[frame_idx]]},
copy=True,
allow_dups=True,
)

return lambda df: df.reindex(joined_index, axis=axis)

return joined_index, make_reindexer

# Internal methods
# These methods are for building the correct answer in a modular way.
Expand Down Expand Up @@ -1697,19 +1741,19 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
Parameters
----------
axis : 0 or 1
The axis to copartition along (0 - rows, 1 - columns).
other : BasePandasFrame
The other dataframes(s) to copartition against.
how : str
How to manage joining the index object ("left", "right", etc.)
sort : boolean
Whether or not to sort the joined index.
force_repartition : boolean
Whether or not to force the repartitioning. By default,
this method will skip repartitioning if it is possible. This is because
reindexing is extremely inefficient. Because this method is used to
`join` or `append`, it is vital that the internal indices match.
axis : 0 or 1
The axis to copartition along (0 - rows, 1 - columns).
other : BasePandasFrame
The other dataframes(s) to copartition against.
how : str
How to manage joining the index object ("left", "right", etc.)
sort : boolean
Whether or not to sort the joined index.
force_repartition : bool, default False
Whether or not to force the repartitioning. By default,
this method will skip repartitioning if it is possible. This is because
reindexing is extremely inefficient. Because this method is used to
`join` or `append`, it is vital that the internal indices match.
Returns
-------
Expand All @@ -1719,79 +1763,62 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
if isinstance(other, type(self)):
other = [other]

is_aligning_applied = False
for i in range(len(other)):
if (
len(self._partitions) != len(other[i]._partitions)
and len(self.axes[0]) == len(other[i].axes[0])
and axis == 0
):
is_aligning_applied = True
self._partitions = self._frame_mgr_cls.map_axis_partitions(
axis, self._partitions, lambda df: df
)
other[i]._partitions = other[i]._frame_mgr_cls.map_axis_partitions(
axis, other[i]._partitions, lambda df: df
)

if (
all(o.axes[axis].equals(self.axes[axis]) for o in other)
and not is_aligning_applied
):
return (
self._partitions,
[self._simple_shuffle(axis, o) for o in other],
self.axes[axis].copy(),
)
# define helper functions
def get_axis_lengths(partitions, axis):
if axis:
return [obj.width() for obj in partitions[0]]
return [obj.length() for obj in partitions.T[0]]

index_other_obj = [o.axes[axis] for o in other]
joined_index = self._join_index_objects(axis, index_other_obj, how, sort)
# We have to set these because otherwise when we perform the functions it may
# end up serializing this entire object.
left_old_idx = self.axes[axis]
right_old_idxes = index_other_obj
self_index = self.axes[axis]
others_index = [o.axes[axis] for o in other]
joined_index, make_reindexer = self._join_index_objects(
axis, [self_index] + others_index, how, sort
)

def make_map_func():
if not joined_index.is_unique and axis == 0:
return lambda df: df
return lambda df: df.reindex(joined_index, axis=axis)
# define conditions for reindexing and repartitioning `self` frame
do_reindex_self = not self_index.equals(joined_index)
do_repartition_self = force_repartition or do_reindex_self

# Start with this and we'll repartition the first time, and then not again.
if is_aligning_applied or (
not force_repartition and left_old_idx.equals(joined_index)
):
reindexed_self = self._partitions
else:
# perform repartitioning and reindexing for `self` frame if needed
if do_repartition_self:
reindexed_self = self._frame_mgr_cls.map_axis_partitions(
axis,
self._partitions,
make_map_func(),
# self frame has 0 idx
make_reindexer(do_reindex_self, 0),
)
else:
reindexed_self = self._partitions

def get_column_widths(partitions):
if len(partitions) > 0:
return [obj.width() for obj in partitions[0]]
# define length of `self` and `other` frames to aligning purpose
self_lengths = get_axis_lengths(reindexed_self, axis)
others_lengths = [o._axes_lengths[axis] for o in other]

def get_row_lengths(partitions):
if len(partitions.T) > 0:
return [obj.length() for obj in partitions.T[0]]
# define conditions for reindexing and repartitioning `other` frames
do_reindex_others = [not index.equals(joined_index) for index in others_index]

reindexed_other_list = []
do_repartition_others = [None] * len(other)
for i in range(len(other)):
if is_aligning_applied or (
not force_repartition and right_old_idxes[i].equals(joined_index)
):
reindexed_other = other[i]._partitions
else:
reindexed_other = other[i]._frame_mgr_cls.map_axis_partitions(
do_repartition_others[i] = (
force_repartition
or do_reindex_others[i]
or others_lengths[i] != self_lengths
)

# perform repartitioning and reindexing for `other` frames if needed
reindexed_other_list = [None] * len(other)
for i in range(len(other)):
if do_repartition_others[i]:
reindexed_other_list[i] = other[i]._frame_mgr_cls.map_axis_partitions(
axis,
other[i]._partitions,
make_map_func(),
lengths=get_row_lengths(reindexed_self)
if axis == 0
else get_column_widths(reindexed_self),
# indices of others frame start from 1 (0 - self frame)
make_reindexer(do_reindex_others[i], 1 + i),
lengths=self_lengths,
)
reindexed_other_list.append(reindexed_other)
else:
reindexed_other_list[i] = other[i]._partitions

return reindexed_self, reindexed_other_list, joined_index

def _simple_shuffle(self, axis, other):
Expand Down Expand Up @@ -1900,7 +1927,7 @@ def _concat(self, axis, others, how, sort):
]
else:
left_parts, right_parts, joined_index = self._copartition(
axis ^ 1, others, how, sort, force_repartition=True
axis ^ 1, others, how, sort, force_repartition=False
)
new_lengths = None
new_widths = None
Expand Down
4 changes: 3 additions & 1 deletion modin/engines/base/frame/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def broadcast_axis_partitions(
right : The right partitions.
keep_partitioning : boolean. Default is False
The flag to keep partitions for Modin Frame.
lengths : list(int)
lengths : list(int), default None
The list of lengths to shuffle the object.
Returns
Expand All @@ -250,6 +250,8 @@ def broadcast_axis_partitions(
# partitions as best we can right now.
if keep_partitioning:
num_splits = len(left) if axis == 0 else len(left.T)
elif lengths:
num_splits = len(lengths)
else:
num_splits = cls._compute_num_partitions()
preprocessed_map_func = cls.preprocess_func(apply_func)
Expand Down
14 changes: 14 additions & 0 deletions modin/test/backends/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
# governing permissions and limitations under the License.

import modin.pandas as pd
from modin.pandas.test.utils import create_test_dfs

pd.DEFAULT_NPARTITIONS = 4


def test_aligning_blocks():
Expand All @@ -38,3 +41,14 @@ def test_aligning_blocks_with_duplicated_index():
df2 = pd.DataFrame(data21).append(pd.DataFrame(data22))

repr(df1 - df2)


def test_aligning_partitions():
data = [0, 1, 2, 3, 4, 5]
modin_df1, _ = create_test_dfs({"a": data, "b": data})
modin_df = modin_df1.loc[:2]

modin_df2 = modin_df.append(modin_df)

modin_df2["c"] = modin_df1["b"]
repr(modin_df2)

0 comments on commit c2e7f9e

Please sign in to comment.