Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add value_counts implementation for Series and as free function #1535

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/supported_apis/series_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ the related section on `Defaulting to pandas`_.
+-----------------------------+---------------------------------+
| ``valid`` | D |
+-----------------------------+---------------------------------+
| ``value_counts`` | D |
| ``value_counts`` | Y |
+-----------------------------+---------------------------------+
| ``values`` | Y |
+-----------------------------+---------------------------------+
Expand Down
2 changes: 1 addition & 1 deletion docs/supported_apis/utilities_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ default to pandas.
+---------------------------+---------------------------------+----------------------------------------------------+
| `pd.unique`_ | Y | |
+---------------------------+---------------------------------+----------------------------------------------------+
| ``pd.value_counts`` | D | |
| ``pd.value_counts`` | Y | |
+---------------------------+---------------------------------+----------------------------------------------------+
| `pd.cut`_ | D | |
+---------------------------+---------------------------------+----------------------------------------------------+
Expand Down
3 changes: 3 additions & 0 deletions modin/backends/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,9 @@ def unique(self, **kwargs):

# END Abstract map partitions operations

def value_counts(self, **kwargs):
pass

# Abstract map partitions across select indices
@abc.abstractmethod
def astype(self, col_dtypes, **kwargs):
Expand Down
94 changes: 94 additions & 0 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,100 @@ def is_monotonic_decreasing(self):
),
)

def value_counts(self, **kwargs):
"""
Return a QueryCompiler of Series containing counts of unique values.

Returns
-------
PandasQueryCompiler
"""
if kwargs.get("bins", None) is not None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do bins not work with MapReduce style? How is the designator for the bin determined?

Copy link
Collaborator Author

@YarShev YarShev Jun 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The designator for the bin determined as follow:

import pandas
import numpy as np
s = pandas.Series([3, 1, 2, 3, 4, np.nan])
r = s.value_counts(bins=3)
r
(2.0, 3.0]      2
(0.996, 2.0]    2
(3.0, 4.0]      1
dtype: int64
r.index
IntervalIndex([(2.0, 3.0], (0.996, 2.0], (3.0, 4.0]],
              closed='right',
              dtype='interval[float64]')
r.index[0]
Interval(2.0, 3.0, closed='right')

MapReduce style doesn't work for bins because equal values may be put in to different bins at the map stage (when we have more than one partitions). Then, when we do groupby by index, we will not able to group bins containing appropriate values.

new_modin_frame = self._modin_frame._apply_full_axis(
0, lambda df: df.squeeze(axis=1).value_counts(**kwargs)
)
return self.__constructor__(new_modin_frame)

def map_func(df, *args, **kwargs):
return df.squeeze(axis=1).value_counts(**kwargs)

def reduce_func(df, *args, **kwargs):
normalize = kwargs.get("normalize", False)
sort = kwargs.get("sort", True)
ascending = kwargs.get("ascending", False)
dropna = kwargs.get("dropna", True)

try:
result = df.squeeze(axis=1).groupby(df.index, sort=False).sum()
# This will happen with Arrow buffer read-only errors. We don't want to copy
# all the time, so this will try to fast-path the code first.
except (ValueError):
result = df.copy().squeeze(axis=1).groupby(df.index, sort=False).sum()

if not dropna and np.nan in df.index:
result = result.append(
pandas.Series(
[df.squeeze(axis=1).loc[[np.nan]].sum()], index=[np.nan]
)
)
if normalize:
result = result / df.squeeze(axis=1).sum()

result = result.sort_values(ascending=ascending) if sort else result

# We want to sort both values and indices of the result object.
# This function will sort indices for equal values.
def sort_index_for_equal_values(result, ascending):
"""
Sort indices for equal values of result object.

Parameters
----------
result : pandas.Series or pandas.DataFrame with one column
The object whose indices for equal values is needed to sort.
ascending : boolean
Sort in ascending (if it is True) or descending (if it is False) order.

Returns
-------
pandas.DataFrame
A new DataFrame with sorted indices.
"""
is_range = False
is_end = False
i = 0
new_index = np.empty(len(result), dtype=type(result.index))
while i < len(result):
j = i
if i < len(result) - 1:
while result[result.index[i]] == result[result.index[i + 1]]:
i += 1
if is_range is False:
is_range = True
if i == len(result) - 1:
is_end = True
break
if is_range:
k = j
for val in sorted(
result.index[j : i + 1], reverse=not ascending
):
new_index[k] = val
k += 1
if is_end:
break
is_range = False
else:
new_index[j] = result.index[j]
i += 1
return pandas.DataFrame(result, index=new_index)

return sort_index_for_equal_values(result, ascending)

return MapReduceFunction.register(map_func, reduce_func, preserve_index=False)(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking on how to structure this to keep the query compiler clean of these low-level implementation details, what if we created a file backends/pandas/kernels.py, then added these functions there and import? I have been thinking about how to keep things clean, maybe it is better left to another PR, but what do you think on this?

Copy link
Collaborator Author

@YarShev YarShev Jun 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is great thought. I think, a file backend/pandas/kernels.py that will contain these implementation details is what we need to keep query compiler clean. Importing functions from that file into query compiler is a very nice solution.

self, **kwargs
)

# END MapReduce operations

# Reduction operations
Expand Down
2 changes: 2 additions & 0 deletions modin/data_management/functions/mapreducefunction.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ class MapReduceFunction(Function):
@classmethod
def call(cls, map_function, reduce_function, **call_kwds):
def caller(query_compiler, *args, **kwargs):
preserve_index = call_kwds.pop("preserve_index", True)
return query_compiler.__constructor__(
query_compiler._modin_frame._map_reduce(
call_kwds.get("axis")
if "axis" in call_kwds
else kwargs.get("axis"),
lambda x: map_function(x, *args, **kwargs),
lambda y: reduce_function(y, *args, **kwargs),
preserve_index=preserve_index,
)
)

Expand Down
41 changes: 32 additions & 9 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,16 +780,26 @@ def _fold_reduce(self, axis, func):
)
return self._compute_map_reduce_metadata(axis, new_parts)

def _map_reduce(self, axis, map_func, reduce_func=None):
"""Apply function that will reduce the data to a Pandas Series.
def _map_reduce(self, axis, map_func, reduce_func=None, preserve_index=True):
"""
Apply function that will reduce the data to a Pandas Series.

Args:
axis: 0 for columns and 1 for rows. Default is 0.
map_func: Callable function to map the dataframe.
reduce_func: Callable function to reduce the dataframe. If none,
then apply map_func twice.
Parameters
----------
axis : 0 or 1
0 for columns and 1 for rows.
map_func : callable
Callable function to map the dataframe.
reduce_func : callable
Callable function to reduce the dataframe.
If none, then apply map_func twice. Default is None.
preserve_index : boolean
The flag to preserve index for default behavior
map and reduce operations. Default is True.

Return:
Returns
-------
BasePandasFrame
A new dataframe.
"""
map_func = self._build_mapreduce_func(axis, map_func)
Expand All @@ -802,7 +812,20 @@ def _map_reduce(self, axis, map_func, reduce_func=None):
reduce_parts = self._frame_mgr_cls.map_axis_partitions(
axis, map_parts, reduce_func
)
return self._compute_map_reduce_metadata(axis, reduce_parts)
if preserve_index:
return self._compute_map_reduce_metadata(axis, reduce_parts)
else:
if axis == 0:
new_index = ["__reduced__"]
new_columns = self._frame_mgr_cls.get_indices(
1, reduce_parts, lambda df: df.columns
)
else:
new_index = self._frame_mgr_cls.get_indices(
0, reduce_parts, lambda df: df.index
)
new_columns = ["__reduced__"]
return self.__constructor__(reduce_parts, new_index, new_columns)

def _map(self, func, dtypes=None):
"""Perform a function that maps across the entire dataset.
Expand Down
4 changes: 2 additions & 2 deletions modin/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

from pandas import (
eval,
value_counts,
cut,
factorize,
test,
Expand Down Expand Up @@ -132,6 +131,7 @@
pivot,
to_numeric,
unique,
value_counts,
)
from .plotting import Plotting as plotting
from .. import execution_engine, Publisher
Expand Down Expand Up @@ -302,7 +302,6 @@ def _update_engine(publisher: Publisher):
"json_normalize",
"concat",
"eval",
"value_counts",
"cut",
"factorize",
"test",
Expand Down Expand Up @@ -382,6 +381,7 @@ def _update_engine(publisher: Publisher):
"pivot",
"to_numeric",
"unique",
"value_counts",
"datetime",
"NamedAgg",
"DEFAULT_NPARTITIONS",
Expand Down
36 changes: 36 additions & 0 deletions modin/pandas/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,39 @@ def unique(values):
The unique values returned as a NumPy array.
"""
return Series(values).unique()


def value_counts(
values, sort=True, ascending=False, normalize=False, bins=None, dropna=True
):
"""
Compute a histogram of the counts of non-null values.

Parameters
----------
values : ndarray (1-d)
sort : bool, default True
Sort by values
ascending : bool, default False
Sort in ascending order
normalize: bool, default False
If True then compute a relative histogram
bins : integer, optional
Rather than count values, group them into half-open bins,
convenience for pd.cut, only works with numeric data
dropna : bool, default True
Don't include counts of NaN

Returns
-------
Series

Notes
-----
The indices of resulting object will be in descending
(ascending, if ascending=True) order for equal values.
It slightly differ from pandas where indices are located in random order.
"""
return Series(values).value_counts(
sort=sort, ascending=ascending, normalize=normalize, bins=bins, dropna=dropna,
)
47 changes: 40 additions & 7 deletions modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1359,13 +1359,46 @@ def update(self, other):
def value_counts(
self, normalize=False, sort=True, ascending=False, bins=None, dropna=True
):
return self._default_to_pandas(
pandas.Series.value_counts,
normalize=normalize,
sort=sort,
ascending=ascending,
bins=bins,
dropna=dropna,
"""
Return a Series containing counts of unique values.

The resulting object will be in descending order so that the
first element is the most frequently-occurring element.
Excludes NA values by default.

Parameters
----------
normalize : bool, default False
If True then the object returned will contain the relative
frequencies of the unique values.
sort : bool, default True
Sort by frequencies.
ascending : bool, default False
Sort in ascending order.
bins : int, optional
Rather than count values, group them into half-open bins,
a convenience for ``pd.cut``, only works with numeric data.
dropna : bool, default True
Don't include counts of NaN.

Returns
-------
Series

Notes
-----
The indices of resulting object will be in descending
(ascending, if ascending=True) order for equal values.
It slightly differ from pandas where indices are located in random order.
"""
return self.__constructor__(
query_compiler=self._query_compiler.value_counts(
normalize=normalize,
sort=sort,
ascending=ascending,
bins=bins,
dropna=dropna,
)
)

def view(self, dtype=None):
Expand Down
51 changes: 51 additions & 0 deletions modin/pandas/test/test_general.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,57 @@ def test_unique():
assert_array_equal(modin_result, pandas_result)


@pytest.mark.parametrize("normalize, bins, dropna", [(True, 3, False)])
def test_value_counts(normalize, bins, dropna):
def sort_index_for_equal_values(result, ascending):
is_range = False
is_end = False
i = 0
new_index = np.empty(len(result), dtype=type(result.index))
while i < len(result):
j = i
if i < len(result) - 1:
while result[result.index[i]] == result[result.index[i + 1]]:
i += 1
if is_range is False:
is_range = True
if i == len(result) - 1:
is_end = True
break
if is_range:
k = j
for val in sorted(result.index[j : i + 1], reverse=not ascending):
new_index[k] = val
k += 1
if is_end:
break
is_range = False
else:
new_index[j] = result.index[j]
i += 1
return pandas.Series(result, index=new_index)

# We sort indices for pandas result because of issue #1650
values = np.array([3, 1, 2, 3, 4, np.nan])
modin_result = pd.value_counts(values, normalize=normalize, ascending=False)
pandas_result = sort_index_for_equal_values(
pandas.value_counts(values, normalize=normalize, ascending=False), False
)
df_equals(modin_result, pandas_result)

modin_result = pd.value_counts(values, bins=bins, ascending=False)
pandas_result = sort_index_for_equal_values(
pandas.value_counts(values, bins=bins, ascending=False), False
)
df_equals(modin_result, pandas_result)

modin_result = pd.value_counts(values, dropna=dropna, ascending=True)
pandas_result = sort_index_for_equal_values(
pandas.value_counts(values, dropna=dropna, ascending=True), True
)
df_equals(modin_result, pandas_result)


def test_to_datetime():
# DataFrame input for to_datetime
modin_df = pd.DataFrame({"year": [2015, 2016], "month": [2, 3], "day": [4, 5]})
Expand Down
Loading