Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48798][PYTHON] Introduce spark.profile.render for SparkSession-based profiling #47202

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ grpc-stubs==1.24.11

# Debug for Spark and Spark Connect
graphviz==0.20.3
flameprof==0.4

# TorchDistributor dependencies
torch
Expand Down
Binary file added docs/img/pyspark-udf-profile.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
29 changes: 29 additions & 0 deletions python/docs/source/development/debugging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,16 @@ The UDF IDs can be seen in the query plan, for example, ``add1(...)#2L`` in ``Ar
+- ArrowEvalPython [add1(id#0L)#2L], [pythonUDF0#11L], 200
+- *(1) Range (0, 10, step=1, splits=16)

We can render the result with an arbitrary renderer function as shown below.

.. code-block:: python

def do_render(codemap):
# Your custom rendering logic
...

spark.profile.render(id=2, type="memory", renderer=do_render)

We can clear the result memory profile as shown below.

.. code-block:: python
Expand Down Expand Up @@ -358,6 +368,25 @@ The UDF IDs can be seen in the query plan, for example, ``add1(...)#2L`` in ``Ar
+- ArrowEvalPython [add1(id#0L)#2L], [pythonUDF0#11L], 200
+- *(1) Range (0, 10, step=1, splits=16)

We can render the result with a preregistered renderer as shown below.

.. code-block:: python

>>> spark.profile.render(id=2, type="perf") # renderer="flameprof" by default

.. image:: ../../../../docs/img/pyspark-udf-profile.png
:alt: PySpark UDF profile

Or with an arbitrary renderer function as shown below.

.. code-block:: python

>>> def do_render(stats):
... # Your custom rendering logic
... ...
...
>>> spark.profile.render(id=2, type="perf", renderer=do_render)

We can clear the result performance profile as shown below.

.. code-block:: python
Expand Down
4 changes: 4 additions & 0 deletions python/docs/source/getting_started/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ Package Supported version Note
`pyarrow` >=10.0.0 Required for Spark SQL
========= ================= ======================

Additional libraries that enhance functionality but are not included in the installation packages:

- **flameprof**: Provide the default renderer for UDF performance profiling.


Pandas API on Spark
^^^^^^^^^^^^^^^^^^^
Expand Down
3 changes: 3 additions & 0 deletions python/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ ignore_missing_imports = True
[mypy-memory_profiler.*]
ignore_missing_imports = True

[mypy-flameprof.*]
ignore_missing_imports = True

; Ignore errors for proto generated code
[mypy-pyspark.sql.connect.proto.*, pyspark.sql.connect.proto]
ignore_errors = True
115 changes: 114 additions & 1 deletion python/pyspark/sql/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
# limitations under the License.
#
from abc import ABC, abstractmethod
from io import StringIO
import os
import pstats
from threading import RLock
from typing import Dict, Optional, TYPE_CHECKING
from typing import Any, Callable, Dict, Literal, Optional, Tuple, Union, TYPE_CHECKING, overload

from pyspark.accumulators import (
Accumulator,
Expand Down Expand Up @@ -360,6 +361,82 @@ def dump(self, path: str, id: Optional[int] = None, *, type: Optional[str] = Non
},
)

@overload
def render(self, id: int, *, type: Optional[str] = None, renderer: Optional[str] = None) -> Any:
...

@overload
def render(
self, id: int, *, type: Optional[Literal["perf"]], renderer: Callable[[pstats.Stats], Any]
) -> Any:
...

@overload
def render(
self, id: int, *, type: Literal["memory"], renderer: Callable[[CodeMapDict], Any]
) -> Any:
...

def render(
self,
id: int,
*,
type: Optional[str] = None,
renderer: Optional[
Union[str, Callable[[pstats.Stats], Any], Callable[[CodeMapDict], Any]]
] = None,
) -> Any:
"""
Render the profile results.

.. versionadded:: 4.0.0

Parameters
----------
id : int
The UDF ID whose profiling results should be rendered.
type : str, optional
The profiler type to clear results for, which can be either "perf" or "memory".
renderer : str or callable, optional
The renderer to use. If not specified, the default renderer will be "flameprof"
ueshin marked this conversation as resolved.
Show resolved Hide resolved
for "perf" profiler. For "memory" profiler, the default renderer is not provided.

If a callable is provided, it should take a `pstats.Stats` object for "perf" profiler,
and `CodeMapDict` for "memory" profiler, and return the rendered result.
"""
result: Optional[Union[pstats.Stats, CodeMapDict]]
if type is None:
type = "perf"
if type == "perf":
result = self.profiler_collector._perf_profile_results.get(id)
elif type == "memory":
result = self.profiler_collector._memory_profile_results.get(id)
else:
raise PySparkValueError(
error_class="VALUE_NOT_ALLOWED",
message_parameters={
"arg_name": "type",
"allowed_values": str(["perf", "memory"]),
},
)

render: Optional[Union[Callable[[pstats.Stats], Any], Callable[[CodeMapDict], Any]]] = None
if renderer is None or isinstance(renderer, str):
render = _renderers.get((type, renderer))
elif callable(renderer):
render = renderer
if render is None:
raise PySparkValueError(
error_class="VALUE_NOT_ALLOWED",
message_parameters={
"arg_name": "(type, renderer)",
"allowed_values": str(list(_renderers.keys())),
},
)

if result is not None:
return render(result) # type:ignore[arg-type, misc]

def clear(self, id: Optional[int] = None, *, type: Optional[str] = None) -> None:
"""
Clear the profile results.
Expand Down Expand Up @@ -388,3 +465,39 @@ def clear(self, id: Optional[int] = None, *, type: Optional[str] = None) -> None
"allowed_values": str(["perf", "memory"]),
},
)


def _render_flameprof(stats: pstats.Stats) -> Any:
try:
from flameprof import render
except ImportError:
raise PySparkValueError(
error_class="PACKAGE_NOT_INSTALLED",
message_parameters={"package_name": "flameprof", "minimum_version": "0.4"},
)

buf = StringIO()
render(stats.stats, buf) # type: ignore[attr-defined]
svg = buf.getvalue()

try:
import IPython

ipython = IPython.get_ipython()
except ImportError:
ipython = None

if ipython:
from IPython.display import HTML

return HTML(svg)
else:
return svg


_renderers: Dict[
Tuple[str, Optional[str]], Union[Callable[[pstats.Stats], Any], Callable[[CodeMapDict], Any]]
] = {
("perf", None): _render_flameprof,
("perf", "flameprof"): _render_flameprof,
}
9 changes: 8 additions & 1 deletion python/pyspark/sql/tests/connect/test_parity_udf_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
import os
import unittest

from pyspark.sql.tests.test_udf_profiler import UDFProfiler2TestsMixin, _do_computation
from pyspark.sql.tests.test_udf_profiler import (
UDFProfiler2TestsMixin,
_do_computation,
has_flameprof,
)
from pyspark.testing.connectutils import ReusedConnectTestCase


Expand Down Expand Up @@ -61,6 +65,9 @@ def action(df):
io.getvalue(), f"10.*{os.path.basename(inspect.getfile(_do_computation))}"
)

if has_flameprof:
self.assertIn("svg", self.spark.profile.render(id))


if __name__ == "__main__":
from pyspark.sql.tests.connect.test_parity_udf_profiler import * # noqa: F401
Expand Down
Loading