From c2e7f9ecb6a31cd0d59f4ce17dacb059980c45f7 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev <45976948+anmyachev@users.noreply.github.com> Date: Fri, 4 Dec 2020 11:20:09 +0300 Subject: [PATCH] FIX-#2374: remove extra code; add pandas way to handle duplicate values in reindex func for binary operations (#2378) Signed-off-by: Anatoly Myachev --- asv_bench/benchmarks/benchmarks.py | 47 ++++ modin/engines/base/frame/data.py | 221 ++++++++++-------- modin/engines/base/frame/partition_manager.py | 4 +- modin/test/backends/pandas/test_internals.py | 14 ++ 4 files changed, 188 insertions(+), 98 deletions(-) diff --git a/asv_bench/benchmarks/benchmarks.py b/asv_bench/benchmarks/benchmarks.py index 33f597cefdf..74f798cd603 100644 --- a/asv_bench/benchmarks/benchmarks.py +++ b/asv_bench/benchmarks/benchmarks.py @@ -41,6 +41,8 @@ JOIN_DATA_SIZE = MERGE_DATA_SIZE ARITHMETIC_DATA_SIZE = GROUPBY_DATA_SIZE +CONCAT_DATA_SIZE = [(10_128, 100, 10_000, 128)] + class TimeGroupBy: param_names = ["impl", "data_type", "data_size"] @@ -111,6 +113,51 @@ def time_merge(self, impl, data_type, data_size, how, sort): self.df1.merge(self.df2, on=self.df1.columns[0], how=how, sort=sort) +class TimeConcat: + param_names = ["data_type", "data_size", "how", "axis"] + params = [ + ["int"], + CONCAT_DATA_SIZE, + ["inner"], + [0, 1], + ] + + def setup(self, data_type, data_size, how, axis): + # shape for generate_dataframe: first - ncols, second - nrows + self.df1 = generate_dataframe( + "modin", data_type, data_size[1], data_size[0], RAND_LOW, RAND_HIGH + ) + self.df2 = generate_dataframe( + "modin", data_type, data_size[3], data_size[2], RAND_LOW, RAND_HIGH + ) + + def time_concat(self, data_type, data_size, how, axis): + pd.concat([self.df1, self.df2], axis=axis, join=how) + + +class TimeBinaryOp: + param_names = ["data_type", "data_size", "binary_op", "axis"] + params = [ + ["int"], + CONCAT_DATA_SIZE, + ["mul"], + [0, 1], + ] + + def setup(self, data_type, data_size, binary_op, axis): + # shape for generate_dataframe: first - ncols, second - nrows + self.df1 = generate_dataframe( + "modin", data_type, data_size[1], data_size[0], RAND_LOW, RAND_HIGH + ) + self.df2 = generate_dataframe( + "modin", data_type, data_size[3], data_size[2], RAND_LOW, RAND_HIGH + ) + self.op = getattr(self.df1, binary_op) + + def time_concat(self, data_type, data_size, binary_op, axis): + self.op(self.df2, axis=axis) + + class TimeArithmetic: param_names = ["impl", "data_type", "data_size", "axis"] params = [ diff --git a/modin/engines/base/frame/data.py b/modin/engines/base/frame/data.py index f18dab29560..2346b0145a7 100644 --- a/modin/engines/base/frame/data.py +++ b/modin/engines/base/frame/data.py @@ -967,7 +967,8 @@ def internal(block_idx, global_index): ] return OrderedDict(partition_ids_with_indices) - def _join_index_objects(self, axis, other_index, how, sort): + @staticmethod + def _join_index_objects(axis, indexes, how, sort): """ Join the pair of index objects (columns or rows) by a given strategy. @@ -976,37 +977,80 @@ def _join_index_objects(self, axis, other_index, how, sort): Parameters ---------- - axis : 0 or 1 - The axis index object to join (0 - rows, 1 - columns). - other_index : Index - The other_index to join on. - how : {'left', 'right', 'inner', 'outer'} - The type of join to join to make. - sort : boolean - Whether or not to sort the joined index + axis : 0 or 1 + The axis index object to join (0 - rows, 1 - columns). + indexes : list(Index) + The indexes to join on. + how : {'left', 'right', 'inner', 'outer'} + The type of join to join to make. + sort : boolean + Whether or not to sort the joined index Returns ------- - Index - Joined indices. + (Index, func) + Joined index with make_reindexer func """ + assert isinstance(indexes, list) - def merge_index(obj1, obj2): + # define helper functions + def merge(left_index, right_index): if axis == 1 and how == "outer" and not sort: - return obj1.union(obj2, sort=False) + return left_index.union(right_index, sort=False) else: - return obj1.join(obj2, how=how, sort=sort) - - if isinstance(other_index, list): - joined_obj = self.columns if axis else self.index - # TODO: revisit for performance - for obj in other_index: - joined_obj = merge_index(joined_obj, obj) - return joined_obj - if axis: - return merge_index(self.columns, other_index) + return left_index.join(right_index, how=how, sort=sort) + + # define condition for joining indexes + do_join_index = False + for index in indexes[1:]: + if not indexes[0].equals(index): + do_join_index = True + break + + # define condition for joining indexes with getting indexers + is_duplicates = any(not index.is_unique for index in indexes) and axis == 0 + indexers = [] + if is_duplicates: + indexers = [None] * len(indexes) + + # perform joining indexes + if do_join_index: + if len(indexes) == 2 and is_duplicates: + # in case of count of indexes > 2 we should perform joining all indexes + # after that get indexers + # in the fast path we can obtain joined_index and indexers in one call + joined_index, indexers[0], indexers[1] = indexes[0].join( + indexes[1], how=how, sort=sort, return_indexers=True + ) + else: + joined_index = indexes[0] + # TODO: revisit for performance + for index in indexes[1:]: + joined_index = merge(joined_index, index) + + if is_duplicates: + for i, index in enumerate(indexes): + indexers[i] = index.get_indexer_for(joined_index) else: - return self.index.join(other_index, how=how, sort=sort) + joined_index = indexes[0].copy() + + def make_reindexer(do_reindex: bool, frame_idx: int): + # the order of the frames must match the order of the indexes + if not do_reindex: + return lambda df: df + + if is_duplicates: + assert indexers != [] + + return lambda df: df._reindex_with_indexers( + {0: [joined_index, indexers[frame_idx]]}, + copy=True, + allow_dups=True, + ) + + return lambda df: df.reindex(joined_index, axis=axis) + + return joined_index, make_reindexer # Internal methods # These methods are for building the correct answer in a modular way. @@ -1697,19 +1741,19 @@ def _copartition(self, axis, other, how, sort, force_repartition=False): Parameters ---------- - axis : 0 or 1 - The axis to copartition along (0 - rows, 1 - columns). - other : BasePandasFrame - The other dataframes(s) to copartition against. - how : str - How to manage joining the index object ("left", "right", etc.) - sort : boolean - Whether or not to sort the joined index. - force_repartition : boolean - Whether or not to force the repartitioning. By default, - this method will skip repartitioning if it is possible. This is because - reindexing is extremely inefficient. Because this method is used to - `join` or `append`, it is vital that the internal indices match. + axis : 0 or 1 + The axis to copartition along (0 - rows, 1 - columns). + other : BasePandasFrame + The other dataframes(s) to copartition against. + how : str + How to manage joining the index object ("left", "right", etc.) + sort : boolean + Whether or not to sort the joined index. + force_repartition : bool, default False + Whether or not to force the repartitioning. By default, + this method will skip repartitioning if it is possible. This is because + reindexing is extremely inefficient. Because this method is used to + `join` or `append`, it is vital that the internal indices match. Returns ------- @@ -1719,79 +1763,62 @@ def _copartition(self, axis, other, how, sort, force_repartition=False): if isinstance(other, type(self)): other = [other] - is_aligning_applied = False - for i in range(len(other)): - if ( - len(self._partitions) != len(other[i]._partitions) - and len(self.axes[0]) == len(other[i].axes[0]) - and axis == 0 - ): - is_aligning_applied = True - self._partitions = self._frame_mgr_cls.map_axis_partitions( - axis, self._partitions, lambda df: df - ) - other[i]._partitions = other[i]._frame_mgr_cls.map_axis_partitions( - axis, other[i]._partitions, lambda df: df - ) - - if ( - all(o.axes[axis].equals(self.axes[axis]) for o in other) - and not is_aligning_applied - ): - return ( - self._partitions, - [self._simple_shuffle(axis, o) for o in other], - self.axes[axis].copy(), - ) + # define helper functions + def get_axis_lengths(partitions, axis): + if axis: + return [obj.width() for obj in partitions[0]] + return [obj.length() for obj in partitions.T[0]] - index_other_obj = [o.axes[axis] for o in other] - joined_index = self._join_index_objects(axis, index_other_obj, how, sort) - # We have to set these because otherwise when we perform the functions it may - # end up serializing this entire object. - left_old_idx = self.axes[axis] - right_old_idxes = index_other_obj + self_index = self.axes[axis] + others_index = [o.axes[axis] for o in other] + joined_index, make_reindexer = self._join_index_objects( + axis, [self_index] + others_index, how, sort + ) - def make_map_func(): - if not joined_index.is_unique and axis == 0: - return lambda df: df - return lambda df: df.reindex(joined_index, axis=axis) + # define conditions for reindexing and repartitioning `self` frame + do_reindex_self = not self_index.equals(joined_index) + do_repartition_self = force_repartition or do_reindex_self - # Start with this and we'll repartition the first time, and then not again. - if is_aligning_applied or ( - not force_repartition and left_old_idx.equals(joined_index) - ): - reindexed_self = self._partitions - else: + # perform repartitioning and reindexing for `self` frame if needed + if do_repartition_self: reindexed_self = self._frame_mgr_cls.map_axis_partitions( axis, self._partitions, - make_map_func(), + # self frame has 0 idx + make_reindexer(do_reindex_self, 0), ) + else: + reindexed_self = self._partitions - def get_column_widths(partitions): - if len(partitions) > 0: - return [obj.width() for obj in partitions[0]] + # define length of `self` and `other` frames to aligning purpose + self_lengths = get_axis_lengths(reindexed_self, axis) + others_lengths = [o._axes_lengths[axis] for o in other] - def get_row_lengths(partitions): - if len(partitions.T) > 0: - return [obj.length() for obj in partitions.T[0]] + # define conditions for reindexing and repartitioning `other` frames + do_reindex_others = [not index.equals(joined_index) for index in others_index] - reindexed_other_list = [] + do_repartition_others = [None] * len(other) for i in range(len(other)): - if is_aligning_applied or ( - not force_repartition and right_old_idxes[i].equals(joined_index) - ): - reindexed_other = other[i]._partitions - else: - reindexed_other = other[i]._frame_mgr_cls.map_axis_partitions( + do_repartition_others[i] = ( + force_repartition + or do_reindex_others[i] + or others_lengths[i] != self_lengths + ) + + # perform repartitioning and reindexing for `other` frames if needed + reindexed_other_list = [None] * len(other) + for i in range(len(other)): + if do_repartition_others[i]: + reindexed_other_list[i] = other[i]._frame_mgr_cls.map_axis_partitions( axis, other[i]._partitions, - make_map_func(), - lengths=get_row_lengths(reindexed_self) - if axis == 0 - else get_column_widths(reindexed_self), + # indices of others frame start from 1 (0 - self frame) + make_reindexer(do_reindex_others[i], 1 + i), + lengths=self_lengths, ) - reindexed_other_list.append(reindexed_other) + else: + reindexed_other_list[i] = other[i]._partitions + return reindexed_self, reindexed_other_list, joined_index def _simple_shuffle(self, axis, other): @@ -1900,7 +1927,7 @@ def _concat(self, axis, others, how, sort): ] else: left_parts, right_parts, joined_index = self._copartition( - axis ^ 1, others, how, sort, force_repartition=True + axis ^ 1, others, how, sort, force_repartition=False ) new_lengths = None new_widths = None diff --git a/modin/engines/base/frame/partition_manager.py b/modin/engines/base/frame/partition_manager.py index 7bd3dfc964d..8310f0d0c53 100644 --- a/modin/engines/base/frame/partition_manager.py +++ b/modin/engines/base/frame/partition_manager.py @@ -238,7 +238,7 @@ def broadcast_axis_partitions( right : The right partitions. keep_partitioning : boolean. Default is False The flag to keep partitions for Modin Frame. - lengths : list(int) + lengths : list(int), default None The list of lengths to shuffle the object. Returns @@ -250,6 +250,8 @@ def broadcast_axis_partitions( # partitions as best we can right now. if keep_partitioning: num_splits = len(left) if axis == 0 else len(left.T) + elif lengths: + num_splits = len(lengths) else: num_splits = cls._compute_num_partitions() preprocessed_map_func = cls.preprocess_func(apply_func) diff --git a/modin/test/backends/pandas/test_internals.py b/modin/test/backends/pandas/test_internals.py index 266c6d7ff8e..cb544aed471 100644 --- a/modin/test/backends/pandas/test_internals.py +++ b/modin/test/backends/pandas/test_internals.py @@ -12,6 +12,9 @@ # governing permissions and limitations under the License. import modin.pandas as pd +from modin.pandas.test.utils import create_test_dfs + +pd.DEFAULT_NPARTITIONS = 4 def test_aligning_blocks(): @@ -38,3 +41,14 @@ def test_aligning_blocks_with_duplicated_index(): df2 = pd.DataFrame(data21).append(pd.DataFrame(data22)) repr(df1 - df2) + + +def test_aligning_partitions(): + data = [0, 1, 2, 3, 4, 5] + modin_df1, _ = create_test_dfs({"a": data, "b": data}) + modin_df = modin_df1.loc[:2] + + modin_df2 = modin_df.append(modin_df) + + modin_df2["c"] = modin_df1["b"] + repr(modin_df2)