Skip to content

Commit

Permalink
FIX-modin-project#4138: optimize internal '.mask()' flow
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev committed Feb 1, 2022
1 parent 32cff0d commit 01b8977
Showing 1 changed file with 57 additions and 11 deletions.
68 changes: 57 additions & 11 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,9 +609,21 @@ def mask(
if row_labels is not None:
row_positions = self.index.get_indexer_for(row_labels)
if row_positions is not None:
sorted_row_positions = (
row_positions
if (
(is_range_like(row_positions) and row_positions.step > 0)
# `np.sort` of empty list returns an array with `float` dtype,
# which doesn't work well as an indexer
or len(row_positions) == 0
)
else np.sort(row_positions)
)
# Get dict of row_parts as {row_index: row_internal_indices}
# TODO: Rename `row_partitions_list`->`row_partitions_dict`
row_partitions_list = self._get_dict_of_block_index(0, row_positions)
row_partitions_list = self._get_dict_of_block_index(
0, sorted_row_positions, are_indices_sorted=True
)
new_row_lengths = [
len(
# Row lengths for slice are calculated as the length of the slice
Expand All @@ -628,7 +640,7 @@ def mask(
slice(row_positions.start, row_positions.stop, row_positions.step)
# TODO: Fast range processing of non-1-step ranges is not yet supported
if is_range_like(row_positions) and row_positions.step > 0
else sorted(row_positions)
else sorted_row_positions
]
else:
row_partitions_list = {
Expand All @@ -641,8 +653,20 @@ def mask(
if col_labels is not None:
col_positions = self.columns.get_indexer_for(col_labels)
if col_positions is not None:
sorted_col_positions = (
col_positions
if (
(is_range_like(col_positions) and col_positions.step > 0)
# `np.sort` of empty list returns an array with `float` dtype,
# which doesn't work well as an indexer
or len(col_positions) == 0
)
else np.sort(col_positions)
)
# Get dict of col_parts as {col_index: col_internal_indices}
col_partitions_list = self._get_dict_of_block_index(1, col_positions)
col_partitions_list = self._get_dict_of_block_index(
1, sorted_col_positions, are_indices_sorted=True
)
new_col_widths = [
len(
# Column widths for slice are calculated as the length of the slice
Expand All @@ -662,7 +686,7 @@ def mask(
col_positions.start, col_positions.stop, col_positions.step
)
else:
monotonic_col_idx = sorted(col_positions)
monotonic_col_idx = sorted_col_positions
new_columns = self.columns[monotonic_col_idx]
ErrorMessage.catch_bugs_and_request_email(
failure_condition=sum(new_col_widths) != len(new_columns),
Expand Down Expand Up @@ -729,14 +753,14 @@ def mask(
# the old. This information is sent to `_reorder_labels`.
if row_positions is not None:
row_order_mapping = dict(
zip(sorted(row_positions), range(len(row_positions)))
zip(sorted_row_positions, range(len(row_positions)))
)
new_row_order = [row_order_mapping[idx] for idx in row_positions]
else:
new_row_order = None
if col_positions is not None:
col_order_mapping = dict(
zip(sorted(col_positions), range(len(col_positions)))
zip(sorted_col_positions, range(len(col_positions)))
)
new_col_order = [col_order_mapping[idx] for idx in col_positions]
else:
Expand Down Expand Up @@ -1061,7 +1085,7 @@ def numeric_columns(self, include_bool=True):
columns.append(col)
return columns

def _get_dict_of_block_index(self, axis, indices):
def _get_dict_of_block_index(self, axis, indices, are_indices_sorted=False):
"""
Convert indices to an ordered dict mapping partition (or block) index to internal indices in said partition.
Expand All @@ -1071,6 +1095,12 @@ def _get_dict_of_block_index(self, axis, indices):
The axis along which to get the indices (0 - rows, 1 - columns).
indices : list of int, slice
A list of global indices to convert.
are_indices_sorted : bool, default: False
Flag indicating whether the `indices` sequence is sorted by ascending or not.
Note: the internal algorithm requires for the `indices` to be sorted, this
flag is used for optimization in order to not sort already sorted data.
Be careful when passing ``True`` for this flag, if the data appears to be unsorted
with the flag set to ``True`` this would lead to undefined behavior.
Returns
-------
Expand All @@ -1087,6 +1117,9 @@ def _get_dict_of_block_index(self, axis, indices):
if isinstance(indices, slice) or (is_range_like(indices) and indices.step == 1):
# Converting range-like indexer to slice
indices = slice(indices.start, indices.stop, indices.step)
# Empty selection case
if indices.start == indices.stop:
return OrderedDict()
if is_full_grab_slice(indices, sequence_len=len(self.axes[axis])):
return OrderedDict(
zip(
Expand Down Expand Up @@ -1142,10 +1175,23 @@ def _get_dict_of_block_index(self, axis, indices):
)
dict_of_slices.update({last_part: slice(None, last_idx[0])})
return dict_of_slices
# Sort and convert negative indices to positive
indices = np.sort(
[i if i >= 0 else max(0, len(self.axes[axis]) + i) for i in indices]
)
if isinstance(indices, list):
# Converting python list to numpy for faster processing
indices = np.array(indices, dtype=np.int64)
negative_mask = np.less(indices, 0)
has_negative = np.any(negative_mask)
if has_negative:
# We're going to modify 'indices' inplace in a numpy way, so doing a copy/converting indices to numpy.
indices = (
indices.copy()
if isinstance(indices, np.ndarray)
else np.array(indices, dtype=np.int64)
)
indices[negative_mask] = indices[negative_mask] % len(self.axes[axis])
# If the `indices` array was modified because of the negative indices convertion
# then the original order was broken and so we have to sort anyway:
if not has_negative and not are_indices_sorted:
indices = np.sort(indices)
if axis == 0:
bins = np.array(self._row_lengths)
else:
Expand Down

0 comments on commit 01b8977

Please sign in to comment.