Skip to content

Commit

Permalink
[SPARK-46687][PYTHON][CONNECT] Basic support of SparkSession-based me…
Browse files Browse the repository at this point in the history
…mory profiler

### What changes were proposed in this pull request?

Basic support of SparkSession-based memory profiler in both Spark Connect and non-Spark-Connect.

### Why are the changes needed?

We need to make the memory profiler SparkSession-based to support memory profiling in Spark Connect.

### Does this PR introduce _any_ user-facing change?

Yes, the SparkSession-based memory profiler is available.

An example is as shown below
```py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.taskcontext import TaskContext

spark.conf.set("spark.sql.pyspark.udf.profiler", "memory")

udf("string")
def f(x):
  if TaskContext.get().partitionId() % 2 == 0:
    return str(x)
  else:
    return None

spark.range(10).select(f(col("id"))).show()

spark.showMemoryProfiles()
```
shows profile result:
```
============================================================
Profile of UDF<id=2>
============================================================
Filename: /var/folders/h_/60n1p_5s7751jx1st4_sk0780000gp/T/ipykernel_72839/2848225169.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     7    113.2 MiB    113.2 MiB          10   udf("string")
     8                                         def f(x):
     9    114.4 MiB      1.3 MiB          10     if TaskContext.get().partitionId() % 2 == 0:
    10     31.8 MiB      0.1 MiB           4       return str(x)
    11                                           else:
    12     82.8 MiB      0.1 MiB           6       return None
```

### How was this patch tested?

New and existing unit tests:
- pyspark.tests.test_memory_profiler
- pyspark.sql.tests.connect.test_parity_memory_profiler

And manual tests on Jupyter notebook.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#44775 from xinrong-meng/memory_profiler_v2.

Authored-by: Xinrong Meng <xinrong@apache.org>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
  • Loading branch information
xinrong-meng authored and ueshin committed Jan 29, 2024
1 parent c468c3d commit 528ac8b
Show file tree
Hide file tree
Showing 7 changed files with 368 additions and 25 deletions.
1 change: 1 addition & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,7 @@ def __hash__(self):
"pyspark.sql.tests.connect.test_parity_readwriter",
"pyspark.sql.tests.connect.test_parity_udf",
"pyspark.sql.tests.connect.test_parity_udf_profiler",
"pyspark.sql.tests.connect.test_parity_memory_profiler",
"pyspark.sql.tests.connect.test_parity_udtf",
"pyspark.sql.tests.connect.test_parity_pandas_udf",
"pyspark.sql.tests.connect.test_parity_pandas_map",
Expand Down
75 changes: 56 additions & 19 deletions python/pyspark/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
Any,
Callable,
Dict,
Iterator,
List,
Optional,
Tuple,
Expand All @@ -37,7 +38,7 @@
import warnings

try:
from memory_profiler import choose_backend, CodeMap, LineProfiler # type: ignore[import]
from memory_profiler import CodeMap, LineProfiler # type: ignore[import]

has_memory_profiler = True
except Exception:
Expand Down Expand Up @@ -196,16 +197,40 @@ def add(
for subcode in filter(inspect.iscode, code.co_consts):
self.add(subcode, toplevel_code=toplevel_code)

class CodeMapForUDFV2(CodeMap):
def add(
self,
code: Any,
toplevel_code: Optional[Any] = None,
) -> None:
if code in self:
return

if toplevel_code is None:
toplevel_code = code
filename = code.co_filename
self._toplevel.append((filename, code))
self[code] = {}
else:
self[code] = self[toplevel_code]
for subcode in filter(inspect.iscode, code.co_consts):
self.add(subcode, toplevel_code=toplevel_code)

def items(self) -> Iterator[Tuple[str, Iterator[Tuple[int, Any]]]]:
"""Iterate on the toplevel code blocks."""
for filename, code in self._toplevel:
measures = self[code]
if not measures:
continue # skip if no measurement
line_iterator = ((line, measures[line]) for line in measures.keys())
yield (filename, line_iterator)

class UDFLineProfiler(LineProfiler):
def __init__(self, **kw: Any) -> None:
super().__init__(**kw)
include_children = kw.get("include_children", False)
backend = kw.get("backend", "psutil")
self.code_map = CodeMapForUDF(include_children=include_children, backend=backend)
self.enable_count = 0
self.max_mem = kw.get("max_mem", None)
self.prevlines: List = []
self.backend = choose_backend(kw.get("backend", None))
self.prev_lineno = None

def __call__(
self,
Expand Down Expand Up @@ -246,6 +271,13 @@ def add_function(
else:
self.code_map.add(code, sub_lines=sub_lines, start_line=start_line)

class UDFLineProfilerV2(LineProfiler):
def __init__(self, **kw: Any) -> None:
super().__init__(**kw)
include_children = kw.get("include_children", False)
backend = kw.get("backend", "psutil")
self.code_map = CodeMapForUDFV2(include_children=include_children, backend=backend)


class PStatsParam(AccumulatorParam[Optional[pstats.Stats]]):
"""PStatsParam is used to merge pstats.Stats"""
Expand Down Expand Up @@ -290,21 +322,22 @@ def addInPlace(
c1 = dict((k, v) for k, v in l1)
c2 = dict((k, v) for k, v in l2)
udf_code_map: Dict[int, Optional[MemoryTuple]] = {}
for lineno in c1:
if c1[lineno] and c2[lineno]:
all_line_numbers = set(c1.keys()) | set(c2.keys())
for lineno in all_line_numbers:
c1_line = c1.get(lineno)
c2_line = c2.get(lineno)
if c1_line and c2_line:
# c1, c2 should have same keys - line number
udf_code_map[lineno] = (
cast(MemoryTuple, c1[lineno])[0]
+ cast(MemoryTuple, c2[lineno])[0], # increment
cast(MemoryTuple, c1[lineno])[1]
+ cast(MemoryTuple, c2[lineno])[1], # mem_usage
cast(MemoryTuple, c1[lineno])[2]
+ cast(MemoryTuple, c2[lineno])[2], # occurrences
cast(MemoryTuple, c1_line)[0] + cast(MemoryTuple, c2_line)[0], # increment
cast(MemoryTuple, c1_line)[1] + cast(MemoryTuple, c2_line)[1], # mem_usage
cast(MemoryTuple, c1_line)[2]
+ cast(MemoryTuple, c2_line)[2], # occurrences
)
elif c1[lineno]:
udf_code_map[lineno] = cast(MemoryTuple, c1[lineno])
elif c2[lineno]:
udf_code_map[lineno] = cast(MemoryTuple, c2[lineno])
elif c1_line:
udf_code_map[lineno] = cast(MemoryTuple, c1_line)
elif c2_line:
udf_code_map[lineno] = cast(MemoryTuple, c2_line)
else:
udf_code_map[lineno] = None
value1[filename] = [(k, v) for k, v in udf_code_map.items()]
Expand Down Expand Up @@ -443,10 +476,14 @@ def _show_results(

float_format = "{0}.{1}f".format(precision + 4, precision)
template_mem = "{0:" + float_format + "} MiB"
for lineno, mem in lines:

lines_dict = {line[0]: line[1] for line in lines}
linenos = range(min(lines_dict), max(lines_dict) + 1)
for lineno in linenos:
total_mem: Union[float, str]
inc: Union[float, str]
occurrences: Union[float, str]
mem = lines_dict.get(lineno)
if mem:
inc = mem[0]
total_mem = mem[1]
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/sql/connect/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,11 @@ def showPerfProfiles(self, id: Optional[int] = None) -> None:

showPerfProfiles.__doc__ = PySparkSession.showPerfProfiles.__doc__

def showMemoryProfiles(self, id: Optional[int] = None) -> None:
self._profiler_collector.show_memory_profiles(id)

showMemoryProfiles.__doc__ = PySparkSession.showMemoryProfiles.__doc__


SparkSession.__doc__ = PySparkSession.__doc__

Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2127,6 +2127,11 @@ def showPerfProfiles(self, id: Optional[int] = None) -> None:

showPerfProfiles.__doc__ = ProfilerCollector.show_perf_profiles.__doc__

def showMemoryProfiles(self, id: Optional[int] = None) -> None:
self._profiler_collector.show_memory_profiles(id)

showMemoryProfiles.__doc__ = ProfilerCollector.show_memory_profiles.__doc__


def _test() -> None:
import os
Expand Down
59 changes: 59 additions & 0 deletions python/pyspark/sql/tests/connect/test_parity_memory_profiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import inspect
import os
import unittest

from pyspark.tests.test_memory_profiler import MemoryProfiler2TestsMixin, _do_computation
from pyspark.testing.connectutils import ReusedConnectTestCase


class MemoryProfilerParityTests(MemoryProfiler2TestsMixin, ReusedConnectTestCase):
def setUp(self) -> None:
super().setUp()
self.spark._profiler_collector._value = None

def test_memory_profiler_udf_multiple_actions(self):
def action(df):
df.collect()
df.show()

with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
_do_computation(self.spark, action=action)

self.assertEqual(6, len(self.profile_results), str(list(self.profile_results)))

for id in self.profile_results:
with self.trap_stdout() as io:
self.spark.showMemoryProfiles(id)

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
)


if __name__ == "__main__":
from pyspark.sql.tests.connect.test_parity_memory_profiler import * # noqa: F401

try:
import xmlrunner # type: ignore[import]

testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
except ImportError:
testRunner = None
unittest.main(testRunner=testRunner, verbosity=2)
Loading

0 comments on commit 528ac8b

Please sign in to comment.