diff --git a/docs/supported_apis/series_supported.rst b/docs/supported_apis/series_supported.rst index 4849f66b921..47e19448861 100644 --- a/docs/supported_apis/series_supported.rst +++ b/docs/supported_apis/series_supported.rst @@ -468,7 +468,7 @@ the related section on `Defaulting to pandas`_. +-----------------------------+---------------------------------+ | ``valid`` | D | +-----------------------------+---------------------------------+ -| ``value_counts`` | D | +| ``value_counts`` | Y | +-----------------------------+---------------------------------+ | ``values`` | Y | +-----------------------------+---------------------------------+ diff --git a/docs/supported_apis/utilities_supported.rst b/docs/supported_apis/utilities_supported.rst index 894a8a97f6b..7475425e6de 100644 --- a/docs/supported_apis/utilities_supported.rst +++ b/docs/supported_apis/utilities_supported.rst @@ -21,7 +21,7 @@ default to pandas. +---------------------------+---------------------------------+----------------------------------------------------+ | `pd.unique`_ | Y | | +---------------------------+---------------------------------+----------------------------------------------------+ -| ``pd.value_counts`` | D | | +| ``pd.value_counts`` | Y | | +---------------------------+---------------------------------+----------------------------------------------------+ | `pd.cut`_ | D | | +---------------------------+---------------------------------+----------------------------------------------------+ diff --git a/modin/backends/base/query_compiler.py b/modin/backends/base/query_compiler.py index f5fef5d45d2..d030243fe9b 100644 --- a/modin/backends/base/query_compiler.py +++ b/modin/backends/base/query_compiler.py @@ -466,6 +466,9 @@ def unique(self, **kwargs): # END Abstract map partitions operations + def value_counts(self, **kwargs): + pass + # Abstract map partitions across select indices @abc.abstractmethod def astype(self, col_dtypes, **kwargs): diff --git a/modin/backends/pandas/query_compiler.py b/modin/backends/pandas/query_compiler.py index 3c5428589bb..3cf987bf420 100644 --- a/modin/backends/pandas/query_compiler.py +++ b/modin/backends/pandas/query_compiler.py @@ -509,6 +509,100 @@ def is_monotonic_decreasing(self): ), ) + def value_counts(self, **kwargs): + """ + Return a QueryCompiler of Series containing counts of unique values. + + Returns + ------- + PandasQueryCompiler + """ + if kwargs.get("bins", None) is not None: + new_modin_frame = self._modin_frame._apply_full_axis( + 0, lambda df: df.squeeze(axis=1).value_counts(**kwargs) + ) + return self.__constructor__(new_modin_frame) + + def map_func(df, *args, **kwargs): + return df.squeeze(axis=1).value_counts(**kwargs) + + def reduce_func(df, *args, **kwargs): + normalize = kwargs.get("normalize", False) + sort = kwargs.get("sort", True) + ascending = kwargs.get("ascending", False) + dropna = kwargs.get("dropna", True) + + try: + result = df.squeeze(axis=1).groupby(df.index, sort=False).sum() + # This will happen with Arrow buffer read-only errors. We don't want to copy + # all the time, so this will try to fast-path the code first. + except (ValueError): + result = df.copy().squeeze(axis=1).groupby(df.index, sort=False).sum() + + if not dropna and np.nan in df.index: + result = result.append( + pandas.Series( + [df.squeeze(axis=1).loc[[np.nan]].sum()], index=[np.nan] + ) + ) + if normalize: + result = result / df.squeeze(axis=1).sum() + + result = result.sort_values(ascending=ascending) if sort else result + + # We want to sort both values and indices of the result object. + # This function will sort indices for equal values. + def sort_index_for_equal_values(result, ascending): + """ + Sort indices for equal values of result object. + + Parameters + ---------- + result : pandas.Series or pandas.DataFrame with one column + The object whose indices for equal values is needed to sort. + ascending : boolean + Sort in ascending (if it is True) or descending (if it is False) order. + + Returns + ------- + pandas.DataFrame + A new DataFrame with sorted indices. + """ + is_range = False + is_end = False + i = 0 + new_index = np.empty(len(result), dtype=type(result.index)) + while i < len(result): + j = i + if i < len(result) - 1: + while result[result.index[i]] == result[result.index[i + 1]]: + i += 1 + if is_range is False: + is_range = True + if i == len(result) - 1: + is_end = True + break + if is_range: + k = j + for val in sorted( + result.index[j : i + 1], reverse=not ascending + ): + new_index[k] = val + k += 1 + if is_end: + break + is_range = False + else: + new_index[j] = result.index[j] + i += 1 + return pandas.DataFrame(result, index=new_index) + + return sort_index_for_equal_values(result, ascending) + + return MapReduceFunction.register(map_func, reduce_func, preserve_index=False)( + self, **kwargs + ) + # END MapReduce operations # Reduction operations diff --git a/modin/data_management/functions/mapreducefunction.py b/modin/data_management/functions/mapreducefunction.py index d4a99e846e3..aace46679ea 100644 --- a/modin/data_management/functions/mapreducefunction.py +++ b/modin/data_management/functions/mapreducefunction.py @@ -18,6 +18,7 @@ class MapReduceFunction(Function): @classmethod def call(cls, map_function, reduce_function, **call_kwds): def caller(query_compiler, *args, **kwargs): + preserve_index = call_kwds.pop("preserve_index", True) return query_compiler.__constructor__( query_compiler._modin_frame._map_reduce( call_kwds.get("axis") @@ -25,6 +26,7 @@ def caller(query_compiler, *args, **kwargs): else kwargs.get("axis"), lambda x: map_function(x, *args, **kwargs), lambda y: reduce_function(y, *args, **kwargs), + preserve_index=preserve_index, ) ) diff --git a/modin/engines/base/frame/data.py b/modin/engines/base/frame/data.py index a09e01f3f63..acdf774db7e 100644 --- a/modin/engines/base/frame/data.py +++ b/modin/engines/base/frame/data.py @@ -780,16 +780,26 @@ def _fold_reduce(self, axis, func): ) return self._compute_map_reduce_metadata(axis, new_parts) - def _map_reduce(self, axis, map_func, reduce_func=None): - """Apply function that will reduce the data to a Pandas Series. + def _map_reduce(self, axis, map_func, reduce_func=None, preserve_index=True): + """ + Apply function that will reduce the data to a Pandas Series. - Args: - axis: 0 for columns and 1 for rows. Default is 0. - map_func: Callable function to map the dataframe. - reduce_func: Callable function to reduce the dataframe. If none, - then apply map_func twice. + Parameters + ---------- + axis : 0 or 1 + 0 for columns and 1 for rows. + map_func : callable + Callable function to map the dataframe. + reduce_func : callable + Callable function to reduce the dataframe. + If none, then apply map_func twice. Default is None. + preserve_index : boolean + The flag to preserve index for default behavior + map and reduce operations. Default is True. - Return: + Returns + ------- + BasePandasFrame A new dataframe. """ map_func = self._build_mapreduce_func(axis, map_func) @@ -802,7 +812,20 @@ def _map_reduce(self, axis, map_func, reduce_func=None): reduce_parts = self._frame_mgr_cls.map_axis_partitions( axis, map_parts, reduce_func ) - return self._compute_map_reduce_metadata(axis, reduce_parts) + if preserve_index: + return self._compute_map_reduce_metadata(axis, reduce_parts) + else: + if axis == 0: + new_index = ["__reduced__"] + new_columns = self._frame_mgr_cls.get_indices( + 1, reduce_parts, lambda df: df.columns + ) + else: + new_index = self._frame_mgr_cls.get_indices( + 0, reduce_parts, lambda df: df.index + ) + new_columns = ["__reduced__"] + return self.__constructor__(reduce_parts, new_index, new_columns) def _map(self, func, dtypes=None, validate_index=False): """Perform a function that maps across the entire dataset. diff --git a/modin/pandas/__init__.py b/modin/pandas/__init__.py index f42dd6e061e..470a80e9ff4 100644 --- a/modin/pandas/__init__.py +++ b/modin/pandas/__init__.py @@ -27,7 +27,6 @@ from pandas import ( eval, - value_counts, cut, factorize, test, @@ -132,6 +131,7 @@ pivot, to_numeric, unique, + value_counts, ) from .plotting import Plotting as plotting from .. import execution_engine, Publisher @@ -302,7 +302,6 @@ def _update_engine(publisher: Publisher): "json_normalize", "concat", "eval", - "value_counts", "cut", "factorize", "test", @@ -382,6 +381,7 @@ def _update_engine(publisher: Publisher): "pivot", "to_numeric", "unique", + "value_counts", "datetime", "NamedAgg", "DEFAULT_NPARTITIONS", diff --git a/modin/pandas/general.py b/modin/pandas/general.py index 071b5dafc1d..d65b8e9ac35 100644 --- a/modin/pandas/general.py +++ b/modin/pandas/general.py @@ -288,3 +288,39 @@ def unique(values): The unique values returned as a NumPy array. """ return Series(values).unique() + + +def value_counts( + values, sort=True, ascending=False, normalize=False, bins=None, dropna=True +): + """ + Compute a histogram of the counts of non-null values. + + Parameters + ---------- + values : ndarray (1-d) + sort : bool, default True + Sort by values + ascending : bool, default False + Sort in ascending order + normalize: bool, default False + If True then compute a relative histogram + bins : integer, optional + Rather than count values, group them into half-open bins, + convenience for pd.cut, only works with numeric data + dropna : bool, default True + Don't include counts of NaN + + Returns + ------- + Series + + Notes + ----- + The indices of resulting object will be in descending + (ascending, if ascending=True) order for equal values. + It slightly differ from pandas where indices are located in random order. + """ + return Series(values).value_counts( + sort=sort, ascending=ascending, normalize=normalize, bins=bins, dropna=dropna, + ) diff --git a/modin/pandas/series.py b/modin/pandas/series.py index f591d725d9e..3ab99338a4d 100644 --- a/modin/pandas/series.py +++ b/modin/pandas/series.py @@ -1385,13 +1385,46 @@ def update(self, other): def value_counts( self, normalize=False, sort=True, ascending=False, bins=None, dropna=True ): - return self._default_to_pandas( - pandas.Series.value_counts, - normalize=normalize, - sort=sort, - ascending=ascending, - bins=bins, - dropna=dropna, + """ + Return a Series containing counts of unique values. + + The resulting object will be in descending order so that the + first element is the most frequently-occurring element. + Excludes NA values by default. + + Parameters + ---------- + normalize : bool, default False + If True then the object returned will contain the relative + frequencies of the unique values. + sort : bool, default True + Sort by frequencies. + ascending : bool, default False + Sort in ascending order. + bins : int, optional + Rather than count values, group them into half-open bins, + a convenience for ``pd.cut``, only works with numeric data. + dropna : bool, default True + Don't include counts of NaN. + + Returns + ------- + Series + + Notes + ----- + The indices of resulting object will be in descending + (ascending, if ascending=True) order for equal values. + It slightly differ from pandas where indices are located in random order. + """ + return self.__constructor__( + query_compiler=self._query_compiler.value_counts( + normalize=normalize, + sort=sort, + ascending=ascending, + bins=bins, + dropna=dropna, + ) ) def view(self, dtype=None): diff --git a/modin/pandas/test/test_general.py b/modin/pandas/test/test_general.py index 6bfb6b29624..3ce93dfd119 100644 --- a/modin/pandas/test/test_general.py +++ b/modin/pandas/test/test_general.py @@ -319,6 +319,57 @@ def test_unique(): assert_array_equal(modin_result, pandas_result) +@pytest.mark.parametrize("normalize, bins, dropna", [(True, 3, False)]) +def test_value_counts(normalize, bins, dropna): + def sort_index_for_equal_values(result, ascending): + is_range = False + is_end = False + i = 0 + new_index = np.empty(len(result), dtype=type(result.index)) + while i < len(result): + j = i + if i < len(result) - 1: + while result[result.index[i]] == result[result.index[i + 1]]: + i += 1 + if is_range is False: + is_range = True + if i == len(result) - 1: + is_end = True + break + if is_range: + k = j + for val in sorted(result.index[j : i + 1], reverse=not ascending): + new_index[k] = val + k += 1 + if is_end: + break + is_range = False + else: + new_index[j] = result.index[j] + i += 1 + return pandas.Series(result, index=new_index) + + # We sort indices for pandas result because of issue #1650 + values = np.array([3, 1, 2, 3, 4, np.nan]) + modin_result = pd.value_counts(values, normalize=normalize, ascending=False) + pandas_result = sort_index_for_equal_values( + pandas.value_counts(values, normalize=normalize, ascending=False), False + ) + df_equals(modin_result, pandas_result) + + modin_result = pd.value_counts(values, bins=bins, ascending=False) + pandas_result = sort_index_for_equal_values( + pandas.value_counts(values, bins=bins, ascending=False), False + ) + df_equals(modin_result, pandas_result) + + modin_result = pd.value_counts(values, dropna=dropna, ascending=True) + pandas_result = sort_index_for_equal_values( + pandas.value_counts(values, dropna=dropna, ascending=True), True + ) + df_equals(modin_result, pandas_result) + + def test_to_datetime(): # DataFrame input for to_datetime modin_df = pd.DataFrame({"year": [2015, 2016], "month": [2, 3], "day": [4, 5]}) diff --git a/modin/pandas/test/test_series.py b/modin/pandas/test/test_series.py index 02ec0955d2b..0f0ff379530 100644 --- a/modin/pandas/test/test_series.py +++ b/modin/pandas/test/test_series.py @@ -2889,12 +2889,55 @@ def test_update(data, other_data): df_equals(modin_series, pandas_series) -@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) -def test_value_counts(data): - modin_series, pandas_series = create_test_series(data) +@pytest.mark.parametrize("normalize, bins, dropna", [(True, 3, False)]) +def test_value_counts(normalize, bins, dropna): + def sort_index_for_equal_values(result, ascending): + is_range = False + is_end = False + i = 0 + new_index = np.empty(len(result), dtype=type(result.index)) + while i < len(result): + j = i + if i < len(result) - 1: + while result[result.index[i]] == result[result.index[i + 1]]: + i += 1 + if is_range is False: + is_range = True + if i == len(result) - 1: + is_end = True + break + if is_range: + k = j + for val in sorted(result.index[j : i + 1], reverse=not ascending): + new_index[k] = val + k += 1 + if is_end: + break + is_range = False + else: + new_index[j] = result.index[j] + i += 1 + return pandas.Series(result, index=new_index) + + # We sort indices for pandas result because of issue #1650 + modin_series, pandas_series = create_test_series(test_data_values[0]) + modin_result = modin_series.value_counts(normalize=normalize, ascending=False) + pandas_result = sort_index_for_equal_values( + pandas_series.value_counts(normalize=normalize, ascending=False), False + ) + df_equals(modin_result, pandas_result) - with pytest.warns(UserWarning): - modin_series.value_counts() + modin_result = modin_series.value_counts(bins=bins, ascending=False) + pandas_result = sort_index_for_equal_values( + pandas_series.value_counts(bins=bins, ascending=False), False + ) + df_equals(modin_result, pandas_result) + + modin_result = modin_series.value_counts(dropna=dropna, ascending=True) + pandas_result = sort_index_for_equal_values( + pandas_series.value_counts(dropna=dropna, ascending=True), True + ) + df_equals(modin_result, pandas_result) @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)