diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst b/python/docs/source/migration_guide/pyspark_upgrade.rst index c3d67df425ba7..6499191d8c8d4 100644 --- a/python/docs/source/migration_guide/pyspark_upgrade.rst +++ b/python/docs/source/migration_guide/pyspark_upgrade.rst @@ -38,6 +38,7 @@ Upgrading from PySpark 3.5 to 4.0 * In Spark 4.0, ``sort_columns`` parameter from ``DataFrame.plot`` and `Series.plot`` has been removed from pandas API on Spark. * In Spark 4.0, the default value of ``regex`` parameter for ``Series.str.replace`` has been changed from ``True`` to ``False`` from pandas API on Spark. Additionally, a single character ``pat`` with ``regex=True`` is now treated as a regular expression instead of a string literal. * In Spark 4.0, the resulting name from ``value_counts`` for all objects sets to ``'count'`` (or ``'propotion'`` if ``nomalize=True`` was passed) from pandas API on Spark, and the index will be named after the original object. +* In Spark 4.0, ``squeeze`` parameter from ``ps.read_csv`` and ``ps.read_excel`` has been removed from pandas API on Spark. Upgrading from PySpark 3.3 to 3.4 diff --git a/python/pyspark/pandas/namespace.py b/python/pyspark/pandas/namespace.py index fddf1bec63fcc..be6d57aa0338d 100644 --- a/python/pyspark/pandas/namespace.py +++ b/python/pyspark/pandas/namespace.py @@ -222,7 +222,6 @@ def read_csv( names: Optional[Union[str, List[str]]] = None, index_col: Optional[Union[str, List[str]]] = None, usecols: Optional[Union[List[int], List[str], Callable[[str], bool]]] = None, - squeeze: bool = False, mangle_dupe_cols: bool = True, dtype: Optional[Union[str, Dtype, Dict[str, Union[str, Dtype]]]] = None, nrows: Optional[int] = None, @@ -262,11 +261,6 @@ def read_csv( from the document header row(s). If callable, the callable function will be evaluated against the column names, returning names where the callable function evaluates to `True`. - squeeze : bool, default False - If the parsed data only contains one column then return a Series. - - .. deprecated:: 3.4.0 - mangle_dupe_cols : bool, default True Duplicate columns will be specified as 'X0', 'X1', ... 'XN', rather than 'X' ... 'X'. Passing in False will cause data to be overwritten if @@ -466,10 +460,7 @@ def read_csv( for col in psdf.columns: psdf[col] = psdf[col].astype(dtype) - if squeeze and len(psdf.columns) == 1: - return first_series(psdf) - else: - return psdf + return psdf def read_json( @@ -912,7 +903,6 @@ def read_excel( names: Optional[List] = None, index_col: Optional[List[int]] = None, usecols: Optional[Union[int, str, List[Union[int, str]], Callable[[str], bool]]] = None, - squeeze: bool = False, dtype: Optional[Dict[str, Union[str, Dtype]]] = None, engine: Optional[str] = None, converters: Optional[Dict] = None, @@ -985,11 +975,6 @@ def read_excel( * If list of string, then indicates list of column names to be parsed. * If callable, then evaluate each column name against it and parse the column if the callable returns ``True``. - squeeze : bool, default False - If the parsed data only contains one column then return a Series. - - .. deprecated:: 3.4.0 - dtype : Type name or dict of column -> type, default None Data type for data or columns. E.g. {'a': np.float64, 'b': np.int32} Use `object` to preserve data as stored in Excel and not interpret dtype. @@ -1142,7 +1127,7 @@ def read_excel( """ def pd_read_excel( - io_or_bin: Any, sn: Union[str, int, List[Union[str, int]], None], sq: bool + io_or_bin: Any, sn: Union[str, int, List[Union[str, int]], None] ) -> pd.DataFrame: return pd.read_excel( io=BytesIO(io_or_bin) if isinstance(io_or_bin, (bytes, bytearray)) else io_or_bin, @@ -1151,7 +1136,6 @@ def pd_read_excel( names=names, index_col=index_col, usecols=usecols, - squeeze=sq, dtype=dtype, engine=engine, converters=converters, @@ -1181,7 +1165,7 @@ def pd_read_excel( io_or_bin = io single_file = True - pdf_or_psers = pd_read_excel(io_or_bin, sn=sheet_name, sq=squeeze) + pdf_or_psers = pd_read_excel(io_or_bin, sn=sheet_name) if single_file: if isinstance(pdf_or_psers, dict): @@ -1208,9 +1192,7 @@ def read_excel_on_spark( ) def output_func(pdf: pd.DataFrame) -> pd.DataFrame: - pdf = pd.concat( - [pd_read_excel(bin, sn=sn, sq=False) for bin in pdf[pdf.columns[0]]] - ) + pdf = pd.concat([pd_read_excel(bin, sn=sn) for bin in pdf[pdf.columns[0]]]) reset_index = pdf.reset_index() for name, col in reset_index.items(): @@ -1231,11 +1213,7 @@ def output_func(pdf: pd.DataFrame) -> pd.DataFrame: .mapInPandas(lambda iterator: map(output_func, iterator), schema=return_schema) ) - psdf = DataFrame(psdf._internal.with_new_sdf(sdf)) - if squeeze and len(psdf.columns) == 1: - return first_series(psdf) - else: - return psdf + return DataFrame(psdf._internal.with_new_sdf(sdf)) if isinstance(pdf_or_psers, dict): return { diff --git a/python/pyspark/pandas/tests/test_csv.py b/python/pyspark/pandas/tests/test_csv.py index b118f7cf8a973..a367dd72be1e5 100644 --- a/python/pyspark/pandas/tests/test_csv.py +++ b/python/pyspark/pandas/tests/test_csv.py @@ -255,24 +255,6 @@ def test_read_csv_with_sep(self): actual = ps.read_csv(fn, sep="\t") self.assert_eq(expected, actual, almost=True) - @unittest.skipIf( - LooseVersion(pd.__version__) >= LooseVersion("2.0.0"), - "TODO(SPARK-43563): Enable CsvTests.test_read_csv_with_squeeze for pandas 2.0.0.", - ) - def test_read_csv_with_squeeze(self): - with self.csv_file(self.csv_text) as fn: - expected = pd.read_csv(fn, squeeze=True, usecols=["name"]) - actual = ps.read_csv(fn, squeeze=True, usecols=["name"]) - self.assert_eq(expected, actual, almost=True) - - expected = pd.read_csv(fn, squeeze=True, usecols=["name", "amount"]) - actual = ps.read_csv(fn, squeeze=True, usecols=["name", "amount"]) - self.assert_eq(expected, actual, almost=True) - - expected = pd.read_csv(fn, squeeze=True, usecols=["name", "amount"], index_col=["name"]) - actual = ps.read_csv(fn, squeeze=True, usecols=["name", "amount"], index_col=["name"]) - self.assert_eq(expected, actual, almost=True) - def test_read_csv_with_mangle_dupe_cols(self): self.assertRaisesRegex( ValueError, "mangle_dupe_cols", lambda: ps.read_csv("path", mangle_dupe_cols=False) diff --git a/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby.py b/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby.py index f581db4bc2f00..23d1b04dd3d33 100644 --- a/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby.py +++ b/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby.py @@ -37,11 +37,6 @@ def tearDownClass(cls): reset_option("compute.ops_on_diff_frames") super().tearDownClass() - @unittest.skipIf( - LooseVersion(pd.__version__) >= LooseVersion("2.0.0"), - "TODO(SPARK-43460): Enable OpsOnDiffFramesGroupByTests.test_groupby_different_lengths " - "for pandas 2.0.0.", - ) def test_groupby_different_lengths(self): pdfs1 = [ pd.DataFrame({"c": [4, 2, 7, 3, None, 1, 1, 1, 2], "d": list("abcdefght")}), @@ -71,7 +66,7 @@ def sort(df): self.assert_eq( sort(psdf1.groupby(psdf2.a, as_index=as_index).sum()), - sort(pdf1.groupby(pdf2.a, as_index=as_index).sum()), + sort(pdf1.groupby(pdf2.a, as_index=as_index).sum(numeric_only=True)), almost=as_index, ) @@ -86,11 +81,6 @@ def sort(df): almost=as_index, ) - @unittest.skipIf( - LooseVersion(pd.__version__) >= LooseVersion("2.0.0"), - "TODO(SPARK-43459): Enable OpsOnDiffFramesGroupByTests.test_groupby_multiindex_columns " - "for pandas 2.0.0.", - ) def test_groupby_multiindex_columns(self): pdf1 = pd.DataFrame( {("y", "c"): [4, 2, 7, 3, None, 1, 1, 1, 2], ("z", "d"): list("abcdefght")} @@ -103,7 +93,7 @@ def test_groupby_multiindex_columns(self): self.assert_eq( psdf1.groupby(psdf2[("x", "a")]).sum().sort_index(), - pdf1.groupby(pdf2[("x", "a")]).sum().sort_index(), + pdf1.groupby(pdf2[("x", "a")]).sum(numeric_only=True).sort_index(), ) self.assert_eq( @@ -112,7 +102,7 @@ def test_groupby_multiindex_columns(self): .sort_values(("y", "c")) .reset_index(drop=True), pdf1.groupby(pdf2[("x", "a")], as_index=False) - .sum() + .sum(numeric_only=True) .sort_values(("y", "c")) .reset_index(drop=True), ) diff --git a/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby_rolling.py b/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby_rolling.py index 17e2bb82bd548..1e5e11637e587 100644 --- a/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby_rolling.py +++ b/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby_rolling.py @@ -72,12 +72,35 @@ def _test_groupby_rolling_func(self, f): getattr(pdf.groupby(pkey)[["b"]].rolling(2), f)().sort_index(), ) - @unittest.skipIf( - LooseVersion(pd.__version__) >= LooseVersion("2.0.0"), - "TODO(SPARK-43452): Enable RollingTests.test_groupby_rolling_count for pandas 2.0.0.", - ) def test_groupby_rolling_count(self): - self._test_groupby_rolling_func("count") + pser = pd.Series([1, 2, 3], name="a") + pkey = pd.Series([1, 2, 3], name="a") + psser = ps.from_pandas(pser) + kkey = ps.from_pandas(pkey) + + # TODO(SPARK-43432): Fix `min_periods` for Rolling.count() to work same as pandas + self.assert_eq( + psser.groupby(kkey).rolling(2).count().sort_index(), + pser.groupby(pkey).rolling(2, min_periods=1).count().sort_index(), + ) + + pdf = pd.DataFrame({"a": [1, 2, 3, 2], "b": [4.0, 2.0, 3.0, 1.0]}) + pkey = pd.Series([1, 2, 3, 2], name="a") + psdf = ps.from_pandas(pdf) + kkey = ps.from_pandas(pkey) + + self.assert_eq( + psdf.groupby(kkey).rolling(2).count().sort_index(), + pdf.groupby(pkey).rolling(2, min_periods=1).count().sort_index(), + ) + self.assert_eq( + psdf.groupby(kkey)["b"].rolling(2).count().sort_index(), + pdf.groupby(pkey)["b"].rolling(2, min_periods=1).count().sort_index(), + ) + self.assert_eq( + psdf.groupby(kkey)[["b"]].rolling(2).count().sort_index(), + pdf.groupby(pkey)[["b"]].rolling(2, min_periods=1).count().sort_index(), + ) def test_groupby_rolling_min(self): self._test_groupby_rolling_func("min") diff --git a/python/pyspark/pandas/tests/test_rolling.py b/python/pyspark/pandas/tests/test_rolling.py index 00b9de8a47890..526962e3bbdd2 100644 --- a/python/pyspark/pandas/tests/test_rolling.py +++ b/python/pyspark/pandas/tests/test_rolling.py @@ -86,12 +86,34 @@ def test_rolling_quantile(self): def test_rolling_sum(self): self._test_rolling_func("sum") - @unittest.skipIf( - LooseVersion(pd.__version__) >= LooseVersion("2.0.0"), - "TODO(SPARK-43451): Enable RollingTests.test_rolling_count for pandas 2.0.0.", - ) def test_rolling_count(self): - self._test_rolling_func("count") + pser = pd.Series([1, 2, 3, 7, 9, 8], index=np.random.rand(6), name="a") + psser = ps.from_pandas(pser) + self.assert_eq(psser.rolling(2).count(), pser.rolling(2, min_periods=1).count()) + self.assert_eq(psser.rolling(2).count().sum(), pser.rolling(2, min_periods=1).count().sum()) + + # TODO(SPARK-43432): Fix `min_periods` for Rolling.count() to work same as pandas + # Multiindex + pser = pd.Series( + [1, 2, 3], + index=pd.MultiIndex.from_tuples([("a", "x"), ("a", "y"), ("b", "z")]), + name="a", + ) + psser = ps.from_pandas(pser) + self.assert_eq(psser.rolling(2).count(), pser.rolling(2, min_periods=1).count()) + + pdf = pd.DataFrame( + {"a": [1.0, 2.0, 3.0, 2.0], "b": [4.0, 2.0, 3.0, 1.0]}, index=np.random.rand(4) + ) + psdf = ps.from_pandas(pdf) + self.assert_eq(psdf.rolling(2).count(), pdf.rolling(2, min_periods=1).count()) + self.assert_eq(psdf.rolling(2).count().sum(), pdf.rolling(2, min_periods=1).count().sum()) + + # Multiindex column + columns = pd.MultiIndex.from_tuples([("a", "x"), ("a", "y")]) + pdf.columns = columns + psdf.columns = columns + self.assert_eq(psdf.rolling(2).count(), pdf.rolling(2, min_periods=1).count()) def test_rolling_std(self): self._test_rolling_func("std") @@ -138,33 +160,18 @@ def _test_groupby_rolling_func(self, ps_func, pd_func=None): pdf = pd.DataFrame({"a": [1.0, 2.0, 3.0, 2.0], "b": [4.0, 2.0, 3.0, 1.0]}) psdf = ps.from_pandas(pdf) - # The behavior of GroupBy.rolling is changed from pandas 1.3. - if LooseVersion(pd.__version__) >= LooseVersion("1.3"): - self.assert_eq( - ps_func(psdf.groupby(psdf.a).rolling(2)).sort_index(), - pd_func(pdf.groupby(pdf.a).rolling(2)).sort_index(), - ) - self.assert_eq( - ps_func(psdf.groupby(psdf.a).rolling(2)).sum(), - pd_func(pdf.groupby(pdf.a).rolling(2)).sum(), - ) - self.assert_eq( - ps_func(psdf.groupby(psdf.a + 1).rolling(2)).sort_index(), - pd_func(pdf.groupby(pdf.a + 1).rolling(2)).sort_index(), - ) - else: - self.assert_eq( - ps_func(psdf.groupby(psdf.a).rolling(2)).sort_index(), - pd_func(pdf.groupby(pdf.a).rolling(2)).drop("a", axis=1).sort_index(), - ) - self.assert_eq( - ps_func(psdf.groupby(psdf.a).rolling(2)).sum(), - pd_func(pdf.groupby(pdf.a).rolling(2)).sum().drop("a"), - ) - self.assert_eq( - ps_func(psdf.groupby(psdf.a + 1).rolling(2)).sort_index(), - pd_func(pdf.groupby(pdf.a + 1).rolling(2)).drop("a", axis=1).sort_index(), - ) + self.assert_eq( + ps_func(psdf.groupby(psdf.a).rolling(2)).sort_index(), + pd_func(pdf.groupby(pdf.a).rolling(2)).sort_index(), + ) + self.assert_eq( + ps_func(psdf.groupby(psdf.a).rolling(2)).sum(), + pd_func(pdf.groupby(pdf.a).rolling(2)).sum(), + ) + self.assert_eq( + ps_func(psdf.groupby(psdf.a + 1).rolling(2)).sort_index(), + pd_func(pdf.groupby(pdf.a + 1).rolling(2)).sort_index(), + ) self.assert_eq( ps_func(psdf.b.groupby(psdf.a).rolling(2)).sort_index(), @@ -184,36 +191,84 @@ def _test_groupby_rolling_func(self, ps_func, pd_func=None): pdf.columns = columns psdf.columns = columns - # The behavior of GroupBy.rolling is changed from pandas 1.3. - if LooseVersion(pd.__version__) >= LooseVersion("1.3"): - self.assert_eq( - ps_func(psdf.groupby(("a", "x")).rolling(2)).sort_index(), - pd_func(pdf.groupby(("a", "x")).rolling(2)).sort_index(), - ) - - self.assert_eq( - ps_func(psdf.groupby([("a", "x"), ("a", "y")]).rolling(2)).sort_index(), - pd_func(pdf.groupby([("a", "x"), ("a", "y")]).rolling(2)).sort_index(), - ) - else: - self.assert_eq( - ps_func(psdf.groupby(("a", "x")).rolling(2)).sort_index(), - pd_func(pdf.groupby(("a", "x")).rolling(2)).drop(("a", "x"), axis=1).sort_index(), - ) - - self.assert_eq( - ps_func(psdf.groupby([("a", "x"), ("a", "y")]).rolling(2)).sort_index(), - pd_func(pdf.groupby([("a", "x"), ("a", "y")]).rolling(2)) - .drop([("a", "x"), ("a", "y")], axis=1) - .sort_index(), - ) - - @unittest.skipIf( - LooseVersion(pd.__version__) >= LooseVersion("2.0.0"), - "TODO(SPARK-43452): Enable RollingTests.test_groupby_rolling_count for pandas 2.0.0.", - ) + self.assert_eq( + ps_func(psdf.groupby(("a", "x")).rolling(2)).sort_index(), + pd_func(pdf.groupby(("a", "x")).rolling(2)).sort_index(), + ) + + self.assert_eq( + ps_func(psdf.groupby([("a", "x"), ("a", "y")]).rolling(2)).sort_index(), + pd_func(pdf.groupby([("a", "x"), ("a", "y")]).rolling(2)).sort_index(), + ) + def test_groupby_rolling_count(self): - self._test_groupby_rolling_func("count") + pser = pd.Series([1, 2, 3, 2], index=np.random.rand(4), name="a") + psser = ps.from_pandas(pser) + # TODO(SPARK-43432): Fix `min_periods` for Rolling.count() to work same as pandas + self.assert_eq( + psser.groupby(psser).rolling(2).count().sort_index(), + pser.groupby(pser).rolling(2, min_periods=1).count().sort_index(), + ) + self.assert_eq( + psser.groupby(psser).rolling(2).count().sum(), + pser.groupby(pser).rolling(2, min_periods=1).count().sum(), + ) + + # Multiindex + pser = pd.Series( + [1, 2, 3, 2], + index=pd.MultiIndex.from_tuples([("a", "x"), ("a", "y"), ("b", "z"), ("c", "z")]), + name="a", + ) + psser = ps.from_pandas(pser) + self.assert_eq( + psser.groupby(psser).rolling(2).count().sort_index(), + pser.groupby(pser).rolling(2, min_periods=1).count().sort_index(), + ) + + pdf = pd.DataFrame({"a": [1.0, 2.0, 3.0, 2.0], "b": [4.0, 2.0, 3.0, 1.0]}) + psdf = ps.from_pandas(pdf) + + self.assert_eq( + psdf.groupby(psdf.a).rolling(2).count().sort_index(), + pdf.groupby(pdf.a).rolling(2, min_periods=1).count().sort_index(), + ) + self.assert_eq( + psdf.groupby(psdf.a).rolling(2).count().sum(), + pdf.groupby(pdf.a).rolling(2, min_periods=1).count().sum(), + ) + self.assert_eq( + psdf.groupby(psdf.a + 1).rolling(2).count().sort_index(), + pdf.groupby(pdf.a + 1).rolling(2, min_periods=1).count().sort_index(), + ) + + self.assert_eq( + psdf.b.groupby(psdf.a).rolling(2).count().sort_index(), + pdf.b.groupby(pdf.a).rolling(2, min_periods=1).count().sort_index(), + ) + self.assert_eq( + psdf.groupby(psdf.a)["b"].rolling(2).count().sort_index(), + pdf.groupby(pdf.a)["b"].rolling(2, min_periods=1).count().sort_index(), + ) + self.assert_eq( + psdf.groupby(psdf.a)[["b"]].rolling(2).count().sort_index(), + pdf.groupby(pdf.a)[["b"]].rolling(2, min_periods=1).count().sort_index(), + ) + + # Multiindex column + columns = pd.MultiIndex.from_tuples([("a", "x"), ("a", "y")]) + pdf.columns = columns + psdf.columns = columns + + self.assert_eq( + psdf.groupby(("a", "x")).rolling(2).count().sort_index(), + pdf.groupby(("a", "x")).rolling(2, min_periods=1).count().sort_index(), + ) + + self.assert_eq( + psdf.groupby([("a", "x"), ("a", "y")]).rolling(2).count().sort_index(), + pdf.groupby([("a", "x"), ("a", "y")]).rolling(2, min_periods=1).count().sort_index(), + ) def test_groupby_rolling_min(self): self._test_groupby_rolling_func("min") diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow.py b/python/pyspark/sql/tests/connect/test_parity_arrow.py index 5f76cafb1927f..258502baa4d5a 100644 --- a/python/pyspark/sql/tests/connect/test_parity_arrow.py +++ b/python/pyspark/sql/tests/connect/test_parity_arrow.py @@ -16,9 +16,6 @@ # import unittest -from distutils.version import LooseVersion - -import pandas as pd from pyspark.sql.tests.test_arrow import ArrowTestsMixin from pyspark.testing.connectutils import ReusedConnectTestCase @@ -123,10 +120,6 @@ def test_toPandas_duplicate_field_names(self): def test_createDataFrame_duplicate_field_names(self): self.check_createDataFrame_duplicate_field_names(True) - @unittest.skipIf( - LooseVersion(pd.__version__) >= LooseVersion("2.0.0"), - "TODO(SPARK-43506): Enable ArrowTests.test_toPandas_empty_columns for pandas 2.0.0.", - ) def test_toPandas_empty_columns(self): self.check_toPandas_empty_columns(True) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index ac45c4c565fa6..e26aabbea2715 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -1015,10 +1015,6 @@ def check_createDataFrame_duplicate_field_names(self, arrow_enabled): self.assertEqual(df.collect(), data) - @unittest.skipIf( - LooseVersion(pd.__version__) >= LooseVersion("2.0.0"), - "TODO(SPARK-43506): Enable ArrowTests.test_toPandas_empty_columns for pandas 2.0.0.", - ) def test_toPandas_empty_columns(self): for arrow_enabled in [True, False]: with self.subTest(arrow_enabled=arrow_enabled):