Skip to content

Commit

Permalink
Add value_counts implementation for Series and as free function (#…
Browse files Browse the repository at this point in the history
…1535)

Signed-off-by: Yaroslav Igoshev <yaroslav.igoshev@intel.com>
Co-authored-by: Devin Petersohn <devin.petersohn@gmail.com>
  • Loading branch information
YarShev and devin-petersohn committed Jun 24, 2020
1 parent 4f95b41 commit d2b4a74
Show file tree
Hide file tree
Showing 11 changed files with 310 additions and 25 deletions.
2 changes: 1 addition & 1 deletion docs/supported_apis/series_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ the related section on `Defaulting to pandas`_.
+-----------------------------+---------------------------------+
| ``valid`` | D |
+-----------------------------+---------------------------------+
| ``value_counts`` | D |
| ``value_counts`` | Y |
+-----------------------------+---------------------------------+
| ``values`` | Y |
+-----------------------------+---------------------------------+
Expand Down
2 changes: 1 addition & 1 deletion docs/supported_apis/utilities_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ default to pandas.
+---------------------------+---------------------------------+----------------------------------------------------+
| `pd.unique`_ | Y | |
+---------------------------+---------------------------------+----------------------------------------------------+
| ``pd.value_counts`` | D | |
| ``pd.value_counts`` | Y | |
+---------------------------+---------------------------------+----------------------------------------------------+
| `pd.cut`_ | D | |
+---------------------------+---------------------------------+----------------------------------------------------+
Expand Down
3 changes: 3 additions & 0 deletions modin/backends/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,9 @@ def unique(self, **kwargs):

# END Abstract map partitions operations

def value_counts(self, **kwargs):
pass

# Abstract map partitions across select indices
@abc.abstractmethod
def astype(self, col_dtypes, **kwargs):
Expand Down
94 changes: 94 additions & 0 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,100 @@ def is_monotonic_decreasing(self):
),
)

def value_counts(self, **kwargs):
"""
Return a QueryCompiler of Series containing counts of unique values.
Returns
-------
PandasQueryCompiler
"""
if kwargs.get("bins", None) is not None:
new_modin_frame = self._modin_frame._apply_full_axis(
0, lambda df: df.squeeze(axis=1).value_counts(**kwargs)
)
return self.__constructor__(new_modin_frame)

def map_func(df, *args, **kwargs):
return df.squeeze(axis=1).value_counts(**kwargs)

def reduce_func(df, *args, **kwargs):
normalize = kwargs.get("normalize", False)
sort = kwargs.get("sort", True)
ascending = kwargs.get("ascending", False)
dropna = kwargs.get("dropna", True)

try:
result = df.squeeze(axis=1).groupby(df.index, sort=False).sum()
# This will happen with Arrow buffer read-only errors. We don't want to copy
# all the time, so this will try to fast-path the code first.
except (ValueError):
result = df.copy().squeeze(axis=1).groupby(df.index, sort=False).sum()

if not dropna and np.nan in df.index:
result = result.append(
pandas.Series(
[df.squeeze(axis=1).loc[[np.nan]].sum()], index=[np.nan]
)
)
if normalize:
result = result / df.squeeze(axis=1).sum()

result = result.sort_values(ascending=ascending) if sort else result

# We want to sort both values and indices of the result object.
# This function will sort indices for equal values.
def sort_index_for_equal_values(result, ascending):
"""
Sort indices for equal values of result object.
Parameters
----------
result : pandas.Series or pandas.DataFrame with one column
The object whose indices for equal values is needed to sort.
ascending : boolean
Sort in ascending (if it is True) or descending (if it is False) order.
Returns
-------
pandas.DataFrame
A new DataFrame with sorted indices.
"""
is_range = False
is_end = False
i = 0
new_index = np.empty(len(result), dtype=type(result.index))
while i < len(result):
j = i
if i < len(result) - 1:
while result[result.index[i]] == result[result.index[i + 1]]:
i += 1
if is_range is False:
is_range = True
if i == len(result) - 1:
is_end = True
break
if is_range:
k = j
for val in sorted(
result.index[j : i + 1], reverse=not ascending
):
new_index[k] = val
k += 1
if is_end:
break
is_range = False
else:
new_index[j] = result.index[j]
i += 1
return pandas.DataFrame(result, index=new_index)

return sort_index_for_equal_values(result, ascending)

return MapReduceFunction.register(map_func, reduce_func, preserve_index=False)(
self, **kwargs
)

# END MapReduce operations

# Reduction operations
Expand Down
2 changes: 2 additions & 0 deletions modin/data_management/functions/mapreducefunction.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ class MapReduceFunction(Function):
@classmethod
def call(cls, map_function, reduce_function, **call_kwds):
def caller(query_compiler, *args, **kwargs):
preserve_index = call_kwds.pop("preserve_index", True)
return query_compiler.__constructor__(
query_compiler._modin_frame._map_reduce(
call_kwds.get("axis")
if "axis" in call_kwds
else kwargs.get("axis"),
lambda x: map_function(x, *args, **kwargs),
lambda y: reduce_function(y, *args, **kwargs),
preserve_index=preserve_index,
)
)

Expand Down
41 changes: 32 additions & 9 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,16 +780,26 @@ def _fold_reduce(self, axis, func):
)
return self._compute_map_reduce_metadata(axis, new_parts)

def _map_reduce(self, axis, map_func, reduce_func=None):
"""Apply function that will reduce the data to a Pandas Series.
def _map_reduce(self, axis, map_func, reduce_func=None, preserve_index=True):
"""
Apply function that will reduce the data to a Pandas Series.
Args:
axis: 0 for columns and 1 for rows. Default is 0.
map_func: Callable function to map the dataframe.
reduce_func: Callable function to reduce the dataframe. If none,
then apply map_func twice.
Parameters
----------
axis : 0 or 1
0 for columns and 1 for rows.
map_func : callable
Callable function to map the dataframe.
reduce_func : callable
Callable function to reduce the dataframe.
If none, then apply map_func twice. Default is None.
preserve_index : boolean
The flag to preserve index for default behavior
map and reduce operations. Default is True.
Return:
Returns
-------
BasePandasFrame
A new dataframe.
"""
map_func = self._build_mapreduce_func(axis, map_func)
Expand All @@ -802,7 +812,20 @@ def _map_reduce(self, axis, map_func, reduce_func=None):
reduce_parts = self._frame_mgr_cls.map_axis_partitions(
axis, map_parts, reduce_func
)
return self._compute_map_reduce_metadata(axis, reduce_parts)
if preserve_index:
return self._compute_map_reduce_metadata(axis, reduce_parts)
else:
if axis == 0:
new_index = ["__reduced__"]
new_columns = self._frame_mgr_cls.get_indices(
1, reduce_parts, lambda df: df.columns
)
else:
new_index = self._frame_mgr_cls.get_indices(
0, reduce_parts, lambda df: df.index
)
new_columns = ["__reduced__"]
return self.__constructor__(reduce_parts, new_index, new_columns)

def _map(self, func, dtypes=None, validate_index=False):
"""Perform a function that maps across the entire dataset.
Expand Down
4 changes: 2 additions & 2 deletions modin/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

from pandas import (
eval,
value_counts,
cut,
factorize,
test,
Expand Down Expand Up @@ -132,6 +131,7 @@
pivot,
to_numeric,
unique,
value_counts,
)
from .plotting import Plotting as plotting
from .. import execution_engine, Publisher
Expand Down Expand Up @@ -302,7 +302,6 @@ def _update_engine(publisher: Publisher):
"json_normalize",
"concat",
"eval",
"value_counts",
"cut",
"factorize",
"test",
Expand Down Expand Up @@ -382,6 +381,7 @@ def _update_engine(publisher: Publisher):
"pivot",
"to_numeric",
"unique",
"value_counts",
"datetime",
"NamedAgg",
"DEFAULT_NPARTITIONS",
Expand Down
36 changes: 36 additions & 0 deletions modin/pandas/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,39 @@ def unique(values):
The unique values returned as a NumPy array.
"""
return Series(values).unique()


def value_counts(
values, sort=True, ascending=False, normalize=False, bins=None, dropna=True
):
"""
Compute a histogram of the counts of non-null values.
Parameters
----------
values : ndarray (1-d)
sort : bool, default True
Sort by values
ascending : bool, default False
Sort in ascending order
normalize: bool, default False
If True then compute a relative histogram
bins : integer, optional
Rather than count values, group them into half-open bins,
convenience for pd.cut, only works with numeric data
dropna : bool, default True
Don't include counts of NaN
Returns
-------
Series
Notes
-----
The indices of resulting object will be in descending
(ascending, if ascending=True) order for equal values.
It slightly differ from pandas where indices are located in random order.
"""
return Series(values).value_counts(
sort=sort, ascending=ascending, normalize=normalize, bins=bins, dropna=dropna,
)
47 changes: 40 additions & 7 deletions modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1385,13 +1385,46 @@ def update(self, other):
def value_counts(
self, normalize=False, sort=True, ascending=False, bins=None, dropna=True
):
return self._default_to_pandas(
pandas.Series.value_counts,
normalize=normalize,
sort=sort,
ascending=ascending,
bins=bins,
dropna=dropna,
"""
Return a Series containing counts of unique values.
The resulting object will be in descending order so that the
first element is the most frequently-occurring element.
Excludes NA values by default.
Parameters
----------
normalize : bool, default False
If True then the object returned will contain the relative
frequencies of the unique values.
sort : bool, default True
Sort by frequencies.
ascending : bool, default False
Sort in ascending order.
bins : int, optional
Rather than count values, group them into half-open bins,
a convenience for ``pd.cut``, only works with numeric data.
dropna : bool, default True
Don't include counts of NaN.
Returns
-------
Series
Notes
-----
The indices of resulting object will be in descending
(ascending, if ascending=True) order for equal values.
It slightly differ from pandas where indices are located in random order.
"""
return self.__constructor__(
query_compiler=self._query_compiler.value_counts(
normalize=normalize,
sort=sort,
ascending=ascending,
bins=bins,
dropna=dropna,
)
)

def view(self, dtype=None):
Expand Down
51 changes: 51 additions & 0 deletions modin/pandas/test/test_general.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,57 @@ def test_unique():
assert_array_equal(modin_result, pandas_result)


@pytest.mark.parametrize("normalize, bins, dropna", [(True, 3, False)])
def test_value_counts(normalize, bins, dropna):
def sort_index_for_equal_values(result, ascending):
is_range = False
is_end = False
i = 0
new_index = np.empty(len(result), dtype=type(result.index))
while i < len(result):
j = i
if i < len(result) - 1:
while result[result.index[i]] == result[result.index[i + 1]]:
i += 1
if is_range is False:
is_range = True
if i == len(result) - 1:
is_end = True
break
if is_range:
k = j
for val in sorted(result.index[j : i + 1], reverse=not ascending):
new_index[k] = val
k += 1
if is_end:
break
is_range = False
else:
new_index[j] = result.index[j]
i += 1
return pandas.Series(result, index=new_index)

# We sort indices for pandas result because of issue #1650
values = np.array([3, 1, 2, 3, 4, np.nan])
modin_result = pd.value_counts(values, normalize=normalize, ascending=False)
pandas_result = sort_index_for_equal_values(
pandas.value_counts(values, normalize=normalize, ascending=False), False
)
df_equals(modin_result, pandas_result)

modin_result = pd.value_counts(values, bins=bins, ascending=False)
pandas_result = sort_index_for_equal_values(
pandas.value_counts(values, bins=bins, ascending=False), False
)
df_equals(modin_result, pandas_result)

modin_result = pd.value_counts(values, dropna=dropna, ascending=True)
pandas_result = sort_index_for_equal_values(
pandas.value_counts(values, dropna=dropna, ascending=True), True
)
df_equals(modin_result, pandas_result)


def test_to_datetime():
# DataFrame input for to_datetime
modin_df = pd.DataFrame({"year": [2015, 2016], "month": [2, 3], "day": [4, 5]})
Expand Down
Loading

0 comments on commit d2b4a74

Please sign in to comment.