Skip to content

Commit

Permalink
[SPARK-48798][PYTHON] Introduce spark.profile.render for SparkSessi…
Browse files Browse the repository at this point in the history
…on-based profiling

### What changes were proposed in this pull request?

Introduces `spark.profile.render` for SparkSession-based profiling.

It uses [`flameprof`](https://github.com/baverman/flameprof/) for the default renderer.

```
$ pip install flameprof
```

run `pyspark` on Jupyter notebook:

```py
from pyspark.sql.functions import pandas_udf

spark.conf.set("spark.sql.pyspark.udf.profiler", "perf")

df = spark.range(10)

pandas_udf("long")
def add1(x):
    return x + 1

added = df.select(add1("id"))
added.show()

spark.profile.render(id=2)
```

<img width="1103" alt="pyspark-udf-profile" src="https://github.com/apache/spark/assets/506656/795972e8-f7eb-4b89-89fc-3d8d18b86541">

On CLI, it will return `svg` source string.

```py
'<?xml version="1.0" standalone="no"?>\n<!DOCTYPE svg  ...
```

Currently only `renderer="flameprof"` for `type="perf"` is supported as a builtin renderer.

You can also pass an arbitrary renderer.

```py
def render_perf(stats):
    ...
spark.profile.render(id=2, type="perf", renderer=render_perf)

def render_memory(codemap):
    ...
spark.profile.render(id=2, type="memory", renderer=render_memory)
```

### Why are the changes needed?

Better debuggability.

### Does this PR introduce _any_ user-facing change?

Yes, `spark.profile.render` will be available.

### How was this patch tested?

Added/updated the related tests, and manually.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47202 from ueshin/issues/SPARK-48798/render.

Authored-by: Takuya Ueshin <ueshin@databricks.com>
Signed-off-by: Takuya Ueshin <ueshin@databricks.com>
  • Loading branch information
ueshin committed Jul 8, 2024
1 parent c4085f1 commit b062d44
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 4 deletions.
1 change: 1 addition & 0 deletions dev/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ grpc-stubs==1.24.11

# Debug for Spark and Spark Connect
graphviz==0.20.3
flameprof==0.4

# TorchDistributor dependencies
torch
Expand Down
Binary file added docs/img/pyspark-udf-profile.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
29 changes: 29 additions & 0 deletions python/docs/source/development/debugging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,16 @@ The UDF IDs can be seen in the query plan, for example, ``add1(...)#2L`` in ``Ar
+- ArrowEvalPython [add1(id#0L)#2L], [pythonUDF0#11L], 200
+- *(1) Range (0, 10, step=1, splits=16)
We can render the result with an arbitrary renderer function as shown below.

.. code-block:: python
def do_render(codemap):
# Your custom rendering logic
...
spark.profile.render(id=2, type="memory", renderer=do_render)
We can clear the result memory profile as shown below.

.. code-block:: python
Expand Down Expand Up @@ -358,6 +368,25 @@ The UDF IDs can be seen in the query plan, for example, ``add1(...)#2L`` in ``Ar
+- ArrowEvalPython [add1(id#0L)#2L], [pythonUDF0#11L], 200
+- *(1) Range (0, 10, step=1, splits=16)
We can render the result with a preregistered renderer as shown below.
.. code-block:: python
>>> spark.profile.render(id=2, type="perf") # renderer="flameprof" by default
.. image:: ../../../../docs/img/pyspark-udf-profile.png
:alt: PySpark UDF profile
Or with an arbitrary renderer function as shown below.
.. code-block:: python
>>> def do_render(stats):
... # Your custom rendering logic
... ...
...
>>> spark.profile.render(id=2, type="perf", renderer=do_render)
We can clear the result performance profile as shown below.
.. code-block:: python
Expand Down
4 changes: 4 additions & 0 deletions python/docs/source/getting_started/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ Package Supported version Note
`pyarrow` >=10.0.0 Required for Spark SQL
========= ================= ======================

Additional libraries that enhance functionality but are not included in the installation packages:

- **flameprof**: Provide the default renderer for UDF performance profiling.


Pandas API on Spark
^^^^^^^^^^^^^^^^^^^
Expand Down
3 changes: 3 additions & 0 deletions python/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ ignore_missing_imports = True
[mypy-memory_profiler.*]
ignore_missing_imports = True

[mypy-flameprof.*]
ignore_missing_imports = True

; Ignore errors for proto generated code
[mypy-pyspark.sql.connect.proto.*, pyspark.sql.connect.proto]
ignore_errors = True
117 changes: 116 additions & 1 deletion python/pyspark/sql/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
# limitations under the License.
#
from abc import ABC, abstractmethod
from io import StringIO
import os
import pstats
from threading import RLock
from typing import Dict, Optional, TYPE_CHECKING
from typing import Any, Callable, Dict, Literal, Optional, Tuple, Union, TYPE_CHECKING, overload

from pyspark.accumulators import (
Accumulator,
Expand Down Expand Up @@ -360,6 +361,84 @@ def dump(self, path: str, id: Optional[int] = None, *, type: Optional[str] = Non
},
)

@overload
def render(self, id: int, *, type: Optional[str] = None, renderer: Optional[str] = None) -> Any:
...

@overload
def render(
self, id: int, *, type: Optional[Literal["perf"]], renderer: Callable[[pstats.Stats], Any]
) -> Any:
...

@overload
def render(
self, id: int, *, type: Literal["memory"], renderer: Callable[[CodeMapDict], Any]
) -> Any:
...

def render(
self,
id: int,
*,
type: Optional[str] = None,
renderer: Optional[
Union[str, Callable[[pstats.Stats], Any], Callable[[CodeMapDict], Any]]
] = None,
) -> Any:
"""
Render the profile results.
.. versionadded:: 4.0.0
Parameters
----------
id : int
The UDF ID whose profiling results should be rendered.
type : str, optional
The profiler type to clear results for, which can be either "perf" or "memory".
renderer : str or callable, optional
The renderer to use. If not specified, the default renderer will be "flameprof"
for the "perf" profiler, which returns an :class:`IPython.display.HTML` object in
an IPython environment to draw the figure; otherwise, it returns the SVG source string.
For the "memory" profiler, no default renderer is provided.
If a callable is provided, it should take a `pstats.Stats` object for "perf" profiler,
and `CodeMapDict` for "memory" profiler, and return the rendered result.
"""
result: Optional[Union[pstats.Stats, CodeMapDict]]
if type is None:
type = "perf"
if type == "perf":
result = self.profiler_collector._perf_profile_results.get(id)
elif type == "memory":
result = self.profiler_collector._memory_profile_results.get(id)
else:
raise PySparkValueError(
error_class="VALUE_NOT_ALLOWED",
message_parameters={
"arg_name": "type",
"allowed_values": str(["perf", "memory"]),
},
)

render: Optional[Union[Callable[[pstats.Stats], Any], Callable[[CodeMapDict], Any]]] = None
if renderer is None or isinstance(renderer, str):
render = _renderers.get((type, renderer))
elif callable(renderer):
render = renderer
if render is None:
raise PySparkValueError(
error_class="VALUE_NOT_ALLOWED",
message_parameters={
"arg_name": "(type, renderer)",
"allowed_values": str(list(_renderers.keys())),
},
)

if result is not None:
return render(result) # type:ignore[arg-type]

def clear(self, id: Optional[int] = None, *, type: Optional[str] = None) -> None:
"""
Clear the profile results.
Expand Down Expand Up @@ -388,3 +467,39 @@ def clear(self, id: Optional[int] = None, *, type: Optional[str] = None) -> None
"allowed_values": str(["perf", "memory"]),
},
)


def _render_flameprof(stats: pstats.Stats) -> Any:
try:
from flameprof import render
except ImportError:
raise PySparkValueError(
error_class="PACKAGE_NOT_INSTALLED",
message_parameters={"package_name": "flameprof", "minimum_version": "0.4"},
)

buf = StringIO()
render(stats.stats, buf) # type: ignore[attr-defined]
svg = buf.getvalue()

try:
import IPython

ipython = IPython.get_ipython()
except ImportError:
ipython = None

if ipython:
from IPython.display import HTML

return HTML(svg)
else:
return svg


_renderers: Dict[
Tuple[str, Optional[str]], Union[Callable[[pstats.Stats], Any], Callable[[CodeMapDict], Any]]
] = {
("perf", None): _render_flameprof,
("perf", "flameprof"): _render_flameprof,
}
9 changes: 8 additions & 1 deletion python/pyspark/sql/tests/connect/test_parity_udf_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
import os
import unittest

from pyspark.sql.tests.test_udf_profiler import UDFProfiler2TestsMixin, _do_computation
from pyspark.sql.tests.test_udf_profiler import (
UDFProfiler2TestsMixin,
_do_computation,
has_flameprof,
)
from pyspark.testing.connectutils import ReusedConnectTestCase


Expand Down Expand Up @@ -61,6 +65,9 @@ def action(df):
io.getvalue(), f"10.*{os.path.basename(inspect.getfile(_do_computation))}"
)

if has_flameprof:
self.assertIn("svg", self.spark.profile.render(id))


if __name__ == "__main__":
from pyspark.sql.tests.connect.test_parity_udf_profiler import * # noqa: F401
Expand Down
Loading

0 comments on commit b062d44

Please sign in to comment.