Skip to content

Commit

Permalink
Series repeat (#1593)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Co-authored-by: Devin Petersohn <devin-petersohn@users.noreply.github.com>
  • Loading branch information
dchigarev and devin-petersohn authored Jun 23, 2020
1 parent 764c337 commit 4f95b41
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 46 deletions.
2 changes: 1 addition & 1 deletion docs/supported_apis/series_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ the related section on `Defaulting to pandas`_.
+-----------------------------+---------------------------------+
| ``reorder_levels`` | D |
+-----------------------------+---------------------------------+
| ``repeat`` | D |
| ``repeat`` | Y |
+-----------------------------+---------------------------------+
| ``replace`` | Y |
+-----------------------------+---------------------------------+
Expand Down
22 changes: 22 additions & 0 deletions modin/backends/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,28 @@ def get_dummies(self, columns, **kwargs):
"""
pass

@abc.abstractmethod
def repeat(self, repeats):
"""
Repeat elements of a Series.
Returns a new Series where each element of the current Series
is repeated consecutively a given number of times.
Parameters
----------
repeats : int or array of ints
The number of repetitions for each element. This should be a
non-negative integer. Repeating 0 times will return an empty
Series.
Returns
-------
Series
Newly created Series with repeated elements.
"""
pass

# Indexing
@abc.abstractmethod
def view(self, index=None, columns=None):
Expand Down
9 changes: 9 additions & 0 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,15 @@ def is_monotonic_decreasing(self):
)
)

def repeat(self, repeats):
def map_fn(df):
return pandas.DataFrame(df.squeeze().repeat(repeats))

if isinstance(repeats, int) or (is_list_like(repeats) and len(repeats) == 1):
return MapFunction.register(map_fn, validate_index=True)(self)
else:
return self.__constructor__(self._modin_frame._apply_full_axis(0, map_fn))

# END Map partitions operations

# String map partitions operations
Expand Down
30 changes: 23 additions & 7 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,15 +804,21 @@ def _map_reduce(self, axis, map_func, reduce_func=None):
)
return self._compute_map_reduce_metadata(axis, reduce_parts)

def _map(self, func, dtypes=None):
def _map(self, func, dtypes=None, validate_index=False):
"""Perform a function that maps across the entire dataset.
Args:
func: The function to apply.
dtypes: (optional) The data types for the result. This is an optimization
Pamareters
----------
func : callable
The function to apply.
dtypes :
(optional) The data types for the result. This is an optimization
because there are functions that always result in a particular data
type, and allows us to avoid (re)computing it.
Returns:
validate_index : bool, (default False)
Is index validation required after performing `func` on partitions.
Returns
-------
A new dataframe.
"""
new_partitions = self._frame_mgr_cls.map_partitions(self._partitions, func)
Expand All @@ -822,11 +828,21 @@ def _map(self, func, dtypes=None):
dtypes = pandas.Series(
[np.dtype(dtypes)] * len(self.columns), index=self.columns
)
if validate_index:
new_index = self._frame_mgr_cls.get_indices(
0, new_partitions, lambda df: df.index
)
else:
new_index = self.index
if len(new_index) != len(self.index):
new_row_lengths = None
else:
new_row_lengths = self._row_lengths
return self.__constructor__(
new_partitions,
self.index,
new_index,
self.columns,
self._row_lengths,
new_row_lengths,
self._column_widths,
dtypes=dtypes,
)
Expand Down
28 changes: 27 additions & 1 deletion modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,33 @@ def rename(
return result

def repeat(self, repeats, axis=None):
return self._default_to_pandas(pandas.Series.repeat, repeats, axis=axis)
"""
Repeat elements of a Series.
Returns a new Series where each element of the current Series
is repeated consecutively a given number of times.
Parameters
----------
repeats : int or array of ints
The number of repetitions for each element. This should be a
non-negative integer. Repeating 0 times will return an empty
Series.
axis : None
Must be ``None``. Has no effect but is accepted for compatibility
with numpy.
Returns
-------
Series
Newly created Series with repeated elements.
"""
if (isinstance(repeats, int) and repeats == 0) or (
is_list_like(repeats) and len(repeats) == 1 and repeats[0] == 0
):
return self.__constructor__()

return self.__constructor__(query_compiler=self._query_compiler.repeat(repeats))

def reset_index(self, level=None, drop=False, name=None, inplace=False):
if drop and level is None:
Expand Down
34 changes: 1 addition & 33 deletions modin/pandas/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
bool_arg_values,
int_arg_keys,
int_arg_values,
eval_general,
)

pd.DEFAULT_NPARTITIONS = 4
Expand All @@ -62,39 +63,6 @@
matplotlib.use("Agg")


def eval_general(modin_df, pandas_df, operation, comparator=df_equals, **kwargs):
md_kwargs, pd_kwargs = {}, {}

def execute_callable(fn, md_kwargs={}, pd_kwargs={}):
try:
pd_result = fn(pandas_df, **pd_kwargs)
except Exception as e:
with pytest.raises(type(e)):
# repr to force materialization
repr(fn(modin_df, **md_kwargs))
else:
md_result = fn(modin_df, **md_kwargs)
return md_result, pd_result

for key, value in kwargs.items():
if callable(value):
values = execute_callable(value)
# that means, that callable raised an exception
if values is None:
return
else:
md_value, pd_value = values
else:
md_value, pd_value = value, value

md_kwargs[key] = md_value
pd_kwargs[key] = pd_value

values = execute_callable(operation, md_kwargs=md_kwargs, pd_kwargs=pd_kwargs)
if values is not None:
comparator(*values)


def eval_insert(modin_df, pandas_df, **kwargs):
_kwargs = {"loc": 0, "col": "New column"}
_kwargs.update(kwargs)
Expand Down
27 changes: 23 additions & 4 deletions modin/pandas/test/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
int_arg_values,
encoding_types,
categories_equals,
eval_general,
)

pd.DEFAULT_NPARTITIONS = 4
Expand Down Expand Up @@ -2271,12 +2272,30 @@ def test_reorder_levels():

@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
@pytest.mark.parametrize(
"repeats", [2, 3, 4], ids=["repeats_{}".format(i) for i in [2, 3, 4]]
"repeats", [0, 2, 3, 4], ids=["repeats_{}".format(i) for i in [0, 2, 3, 4]]
)
def test_repeat(data, repeats):
modin_series, pandas_series = create_test_series(data)
with pytest.warns(UserWarning):
df_equals(modin_series.repeat(repeats), pandas_series.repeat(repeats))
eval_general(pd.Series(data), pandas.Series(data), lambda df: df.repeat(repeats))


@pytest.mark.parametrize("data", [np.arange(256)])
@pytest.mark.parametrize(
"repeats",
[
[0],
[2],
[3],
[4],
np.arange(256),
[0] * 64 + [2] * 64 + [3] * 32 + [4] * 32 + [5] * 64,
[2] * 257,
[2] * 128,
],
)
def test_repeat_lists(data, repeats):
eval_general(
pd.Series(data), pandas.Series(data), lambda df: df.repeat(repeats),
)


@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
Expand Down
34 changes: 34 additions & 0 deletions modin/pandas/test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import pytest
import copy
import numpy as np
import pandas
Expand Down Expand Up @@ -551,3 +552,36 @@ def check_df_columns_have_nans(df, cols):
and cols in df.columns
and df[cols].hasnans
)


def eval_general(modin_df, pandas_df, operation, comparator=df_equals, **kwargs):
md_kwargs, pd_kwargs = {}, {}

def execute_callable(fn, md_kwargs={}, pd_kwargs={}):
try:
pd_result = fn(pandas_df, **pd_kwargs)
except Exception as e:
with pytest.raises(type(e)):
# repr to force materialization
repr(fn(modin_df, **md_kwargs))
else:
md_result = fn(modin_df, **md_kwargs)
return md_result, pd_result

for key, value in kwargs.items():
if callable(value):
values = execute_callable(value)
# that means, that callable raised an exception
if values is None:
return
else:
md_value, pd_value = values
else:
md_value, pd_value = value, value

md_kwargs[key] = md_value
pd_kwargs[key] = pd_value

values = execute_callable(operation, md_kwargs=md_kwargs, pd_kwargs=pd_kwargs)
if values is not None:
comparator(*values)

0 comments on commit 4f95b41

Please sign in to comment.