Skip to content

Commit

Permalink
Robuster faster tests memory sampler (dask#8758)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Jul 12, 2024
1 parent 0a8d8c9 commit 48eefee
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 49 deletions.
14 changes: 11 additions & 3 deletions distributed/diagnostics/memory_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def to_pandas(self, *, align: bool = False) -> pd.DataFrame:
if align:
# convert datetime to timedelta from the first sample
s.index -= s.index[0]
ss[label] = s
ss[label] = s[~s.index.duplicated()] # type: ignore[attr-defined]

df = pd.DataFrame(ss)

Expand All @@ -169,8 +169,16 @@ def plot(self, *, align: bool = False, **kwargs: Any) -> Any:
=======
Output of :meth:`pandas.DataFrame.plot`
"""
df = self.to_pandas(align=align).resample("1s").nearest() / 2**30
return df.plot(
df = self.to_pandas(align=align)
resampled = df.resample("1s").nearest() / 2**30
# If resampling collapses data onto one point, we'll run into
# https://stackoverflow.com/questions/58322744/matplotlib-userwarning-attempting-to-set-identical-left-right-737342-0
# This should only happen in tests since users typically sample for more
# than a second
if len(resampled) == 1:
resampled = df.resample("1ms").nearest() / 2**30

return resampled.plot(
xlabel="time",
ylabel="Cluster memory (GiB)",
**kwargs,
Expand Down
107 changes: 62 additions & 45 deletions distributed/diagnostics/tests/test_memory_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,47 @@

import pytest

import dask

from distributed import Client
from distributed.diagnostics import MemorySampler
from distributed.utils_test import gen_cluster
from distributed.utils_test import SizeOf, cluster, gen_cluster, gen_test


@gen_cluster(client=True)
async def test_async(c, s, a, b):
@pytest.fixture(scope="module")
@gen_cluster(client=True, config={"distributed.admin.system-monitor.interval": "1ms"})
async def some_sample(c, s, *workers):
ms = MemorySampler()
async with ms.sample("foo", measure="managed", interval=0.1):
name = "foo"
async with ms.sample(name, measure="managed", interval=0.001):
f = c.submit(lambda: 1)
await f
await asyncio.sleep(0.5)
await asyncio.sleep(0.1)
f.release()
await asyncio.sleep(0.1)

assert ms.samples["foo"][0][1] == 0
assert ms.samples["foo"][-1][1] > 0
assert ms.samples[name][0][1] == 0
assert sum([s[1] for s in ms.samples[name]]) > 0

# Test that there is no server-side memory leak
assert not s.extensions["memory_sampler"].samples
return name, ms


def test_sync(client):
ms = MemorySampler()
with ms.sample("foo", measure="managed", interval=0.1):
f = client.submit(lambda: 1)
f.result()
time.sleep(0.5)
def test_sync(loop):
with (
dask.config.set({"distributed.admin.system-monitor.interval": "1ms"}),
cluster() as (scheduler, _),
Client(scheduler["address"], loop=loop) as client,
):
ms = MemorySampler()
with ms.sample("foo", measure="managed", interval=0.001):
f = client.submit(lambda: 1)
f.result()
time.sleep(0.1)

assert ms.samples["foo"][0][1] == 0
assert ms.samples["foo"][-1][1] > 0
assert ms.samples["foo"][0][1] == 0
assert sum([s[1] for s in ms.samples["foo"]]) > 0


@gen_cluster(client=True) # MemorySampler internally fetches the client
Expand All @@ -46,53 +59,55 @@ async def test_at_least_one_sample(c, s, a, b):
assert len(next(iter(ms.samples.values()))) == 1


@pytest.mark.slow
@gen_cluster(client=True)
async def test_multi_sample(c, s, a, b):
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_multi_sample(c, s, a):
expected_process_memory = 20 * 1024**2

def mock_process_memory():
return expected_process_memory if a.data.fast else 0

a.monitor.get_process_memory = mock_process_memory
a.monitor.update()
await a.heartbeat()

ms = MemorySampler()
s1 = ms.sample("managed", measure="managed", interval=0.15)
s2 = ms.sample("process", interval=0.2)
s1 = ms.sample("managed", measure="managed", interval=0.001)
s2 = ms.sample("process", interval=0.001)

expected_managed_memory = 100 * 1024
payload = SizeOf(expected_managed_memory)

async with s1, s2:
idle_mem = s.memory.process
f = c.submit(lambda: "x" * 100 * 2**20) # 100 MiB
f = c.submit(lambda: payload)
await f
while s.memory.process < idle_mem + 80 * 2**20:
# Wait for heartbeat
await asyncio.sleep(0.01)
await asyncio.sleep(0.6)
a.monitor.update()
await a.heartbeat()
await asyncio.sleep(0.01)

m = ms.samples["managed"]
p = ms.samples["process"]

assert len(m) >= 2
assert m[0][1] == 0
assert m[-1][1] >= 100 * 2**20
assert m[-1][1] == expected_managed_memory
assert len(p) >= 2
assert p[0][1] > 2**20 # Assume > 1 MiB for idle process
assert p[-1][1] > p[0][1] + 80 * 2**20
assert m[-1][1] < p[-1][1]
assert p[0][1] == 0
assert p[-1][1] == expected_process_memory


@gen_cluster(client=True)
@gen_test()
@pytest.mark.parametrize("align", [False, True])
async def test_pandas(c, s, a, b, align):
async def test_pandas(some_sample, align):
name, ms = some_sample
pd = pytest.importorskip("pandas")
pytest.importorskip("matplotlib")

ms = MemorySampler()
async with ms.sample("foo", measure="managed", interval=0.15):
f = c.submit(lambda: 1)
await f
await asyncio.sleep(1.5)

assert ms.samples["foo"][0][1] == 0
assert ms.samples["foo"][-1][1] > 0

df = ms.to_pandas(align=align)
assert isinstance(df, pd.DataFrame)
if align:
assert isinstance(df.index, pd.TimedeltaIndex)
assert df["foo"].iloc[0] == 0
assert df["foo"].iloc[-1] > 0
assert df[name].iloc[0] == 0
assert df[name].sum() > 1
assert df.index[0] == pd.Timedelta(0, unit="s")
assert pd.Timedelta(0, unit="s") < df.index[1]
assert df.index[1] < pd.Timedelta(1.5, unit="s")
Expand All @@ -105,15 +120,17 @@ async def test_pandas(c, s, a, b, align):
assert plt


@pytest.mark.slow
@gen_cluster(client=True)
@pytest.mark.parametrize("align", [False, True])
async def test_pandas_multiseries(c, s, a, b, align):
"""Test that multiple series are upsampled and aligned to each other"""
pd = pytest.importorskip("pandas")

ms = MemorySampler()
for label, interval, final_sleep in (("foo", 0.15, 1.0), ("bar", 0.2, 0.6)):
for label, interval, final_sleep in (
("foo", 0.001, 0.2),
("bar", 0.002, 0.01),
):
async with ms.sample(label, measure="managed", interval=interval):
x = c.submit(lambda: 1, key="x")
await x
Expand Down
5 changes: 4 additions & 1 deletion distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
from typing import Any as AnyType
from typing import ClassVar, TypeVar, overload

import click
import psutil
import tblib.pickling_support
from tornado import escape
Expand Down Expand Up @@ -1274,6 +1273,10 @@ def has_keyword(func, keyword):

@functools.lru_cache(1000)
def command_has_keyword(cmd, k):
# Click is a relatively expensive import
# That hurts startup time a little
import click

if cmd is not None:
if isinstance(cmd, str):
try:
Expand Down

0 comments on commit 48eefee

Please sign in to comment.