From 48152b1779a5b8191dd0e09424fdb552cac55d49 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Tue, 16 Jan 2024 11:20:40 -0800 Subject: [PATCH] [SPARK-46663][PYTHON] Disable memory profiler for pandas UDFs with iterators ### What changes were proposed in this pull request? When using pandas UDFs with iterators, if users enable the profiling spark conf, a warning indicating non-support should be raised, and profiling should be disabled. However, currently, after raising the not-supported warning, the memory profiler is still being enabled. The PR proposed to fix that. ### Why are the changes needed? A bug fix to eliminate misleading behavior. ### Does this PR introduce _any_ user-facing change? The noticeable changes will affect only those using the PySpark shell. This is because, in the PySpark shell, the memory profiler will raise an error, which in turn blocks the execution of the UDF. ### How was this patch tested? Manual test. ### Was this patch authored or co-authored using generative AI tooling? Setup: ```py $ ./bin/pyspark --conf spark.python.profile=true >>> from typing import Iterator >>> from pyspark.sql.functions import * >>> import pandas as pd >>> pandas_udf("long") ... def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: ... for s in iterator: ... yield s + 1 ... >>> df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"])) ``` Before: ``` >>> df.select(plus_one(df.v)).show() UserWarning: Profiling UDFs with iterators input/output is not supported. Traceback (most recent call last): ... OSError: could not get source code ``` After: ``` >>> df.select(plus_one(df.v)).show() /Users/xinrong.meng/spark/python/pyspark/sql/udf.py:417: UserWarning: Profiling UDFs with iterators input/output is not supported. +-----------+ |plus_one(v)| +-----------+ | 2| | 3| | 4| +-----------+ ``` Closes #44668 from xinrong-meng/fix_mp. Authored-by: Xinrong Meng Signed-off-by: Xinrong Meng --- python/pyspark/sql/tests/test_udf_profiler.py | 45 ++++++++++++++++++- python/pyspark/sql/udf.py | 33 +++++++------- 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/python/pyspark/sql/tests/test_udf_profiler.py b/python/pyspark/sql/tests/test_udf_profiler.py index 136f423d0a35c..776d5da88bb27 100644 --- a/python/pyspark/sql/tests/test_udf_profiler.py +++ b/python/pyspark/sql/tests/test_udf_profiler.py @@ -19,11 +19,13 @@ import unittest import os import sys +import warnings from io import StringIO +from typing import Iterator from pyspark import SparkConf from pyspark.sql import SparkSession -from pyspark.sql.functions import udf +from pyspark.sql.functions import udf, pandas_udf from pyspark.profiler import UDFBasicProfiler @@ -101,6 +103,47 @@ def add2(x): df = self.spark.range(10) df.select(add1("id"), add2("id"), add1("id")).collect() + # Unsupported + def exec_pandas_udf_iter_to_iter(self): + import pandas as pd + + @pandas_udf("int") + def iter_to_iter(batch_ser: Iterator[pd.Series]) -> Iterator[pd.Series]: + for ser in batch_ser: + yield ser + 1 + + self.spark.range(10).select(iter_to_iter("id")).collect() + + # Unsupported + def exec_map(self): + import pandas as pd + + def map(pdfs: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]: + for pdf in pdfs: + yield pdf[pdf.id == 1] + + df = self.spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0)], ("id", "v")) + df.mapInPandas(map, schema=df.schema).collect() + + def test_unsupported(self): + with warnings.catch_warnings(record=True) as warns: + warnings.simplefilter("always") + self.exec_pandas_udf_iter_to_iter() + user_warns = [warn.message for warn in warns if isinstance(warn.message, UserWarning)] + self.assertTrue(len(user_warns) > 0) + self.assertTrue( + "Profiling UDFs with iterators input/output is not supported" in str(user_warns[0]) + ) + + with warnings.catch_warnings(record=True) as warns: + warnings.simplefilter("always") + self.exec_map() + user_warns = [warn.message for warn in warns if isinstance(warn.message, UserWarning)] + self.assertTrue(len(user_warns) > 0) + self.assertTrue( + "Profiling UDFs with iterators input/output is not supported" in str(user_warns[0]) + ) + if __name__ == "__main__": from pyspark.sql.tests.test_udf_profiler import * # noqa: F401 diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 16605bc12acc7..ca38556431ad9 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -28,7 +28,6 @@ from py4j.java_gateway import JavaObject from pyspark import SparkContext -from pyspark.profiler import Profiler from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType from pyspark.sql.column import Column, _to_java_expr, _to_seq from pyspark.sql.types import ( @@ -403,24 +402,24 @@ def __call__(self, *args: "ColumnOrName", **kwargs: "ColumnOrName") -> Column: for key, value in kwargs.items() ] - profiler: Optional[Profiler] = None - memory_profiler: Optional[Profiler] = None - if sc.profiler_collector: - profiler_enabled = sc._conf.get("spark.python.profile", "false") == "true" - memory_profiler_enabled = sc._conf.get("spark.python.profile.memory", "false") == "true" + profiler_enabled = sc._conf.get("spark.python.profile", "false") == "true" + memory_profiler_enabled = sc._conf.get("spark.python.profile.memory", "false") == "true" + if profiler_enabled or memory_profiler_enabled: # Disable profiling Pandas UDFs with iterators as input/output. - if profiler_enabled or memory_profiler_enabled: - if self.evalType in [ - PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, - PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, - PythonEvalType.SQL_MAP_ARROW_ITER_UDF, - ]: - profiler_enabled = memory_profiler_enabled = False - warnings.warn( - "Profiling UDFs with iterators input/output is not supported.", - UserWarning, - ) + if self.evalType in [ + PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, + PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, + PythonEvalType.SQL_MAP_ARROW_ITER_UDF, + ]: + warnings.warn( + "Profiling UDFs with iterators input/output is not supported.", + UserWarning, + ) + judf = self._judf + jUDFExpr = judf.builder(_to_seq(sc, jexprs)) + jPythonUDF = judf.fromUDFExpr(jUDFExpr) + return Column(jPythonUDF) # Disallow enabling two profilers at the same time. if profiler_enabled and memory_profiler_enabled: