Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Work around flakyness in spill hysteresis #5850

Closed
wants to merge 12 commits into from
14 changes: 10 additions & 4 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from inspect import isawaitable
from queue import Empty
from time import sleep as sync_sleep
from typing import ClassVar
from typing import TYPE_CHECKING, ClassVar, Literal

import psutil
from tornado import gen
Expand Down Expand Up @@ -45,6 +45,9 @@
)
from .worker import Worker, parse_memory_limit, run

if TYPE_CHECKING:
from .diagnostics.plugin import NannyPlugin

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -94,6 +97,7 @@ def __init__(
services=None,
name=None,
memory_limit="auto",
memory_terminate_fraction: float | Literal[False] | None = None,
reconnect=True,
validate=False,
quiet=False,
Expand Down Expand Up @@ -203,8 +207,10 @@ def __init__(
self.worker_kwargs = worker_kwargs

self.contact_address = contact_address
self.memory_terminate_fraction = dask.config.get(
"distributed.worker.memory.terminate"
self.memory_terminate_fraction = (
memory_terminate_fraction
if memory_terminate_fraction is not None
else dask.config.get("distributed.worker.memory.terminate")
)

self.services = services
Expand All @@ -231,7 +237,7 @@ def __init__(
"plugin_remove": self.plugin_remove,
}

self.plugins = {}
self.plugins: dict[str, NannyPlugin] = {}

super().__init__(
handlers=handlers, io_loop=self.loop, connection_args=self.connection_args
Expand Down
250 changes: 126 additions & 124 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import importlib
import logging
import math
import os
import sys
import threading
Expand Down Expand Up @@ -35,7 +36,7 @@
wait,
)
from distributed.comm.registry import backends
from distributed.compatibility import LINUX, WINDOWS
from distributed.compatibility import LINUX, MACOS, WINDOWS
from distributed.core import CommClosedError, Status, rpc
from distributed.diagnostics import nvml
from distributed.diagnostics.plugin import PipInstall
Expand Down Expand Up @@ -1282,6 +1283,30 @@ async def test_spill_constrained(c, s, w):
assert set(w.data.disk) == {x.key}


class BadSizeof:
"""Configurable actual process memory and reported managed memory"""

# dummy *args is to facilitate Client.map
def __init__(self, *args, process: float, managed: float):
self.process = int(process)
self.managed = int(managed)
self.data = "x" * self.process

def __sizeof__(self) -> int:
return self.managed

def __getstate__(self):
"""Do not rely on actual disk speed, which is sluggish and unpredictable on CI.
Also remove the impact of installing lz4 or blosc, which would shrink down
self.data to kilobytes.
"""
return self.process, self.managed

def __setstate__(self, state) -> None:
self.process, self.managed = state
self.data = "x" * self.process


@requires_zict
@gen_cluster(
nthreads=[("", 1)],
Expand All @@ -1306,142 +1331,119 @@ async def test_spill_spill_threshold(c, s, a):
memory = psutil.Process().memory_info().rss
a.memory_limit = (memory + 300e6) / 0.7

class UnderReport:
"""100 MB process memory, 10 bytes reported managed memory"""

def __init__(self, *args):
self.data = "x" * int(100e6)

def __sizeof__(self):
return 10

def __reduce__(self):
"""Speed up test by writing very little to disk when spilling"""
return UnderReport, ()

futures = c.map(UnderReport, range(8))

futures = c.map(BadSizeof, range(8), process=100e6, managed=10)
while not a.data.disk:
await asyncio.sleep(0.01)


async def assert_not_everything_is_spilled(w: Worker) -> None:
start = time()
while time() < start + 0.5:
assert w.data
if not w.data.memory: # type: ignore
# The hysteresis system fails on Windows and MacOSX because process memory
# is very slow to shrink down after calls to PyFree. As a result,
# Worker.memory_monitor will continue spilling until there's nothing left.
# Nothing we can do about this short of finding either a way to change this
# behaviour at OS level or a better measure of allocated memory.
assert not LINUX, "All data was spilled to disk"
raise pytest.xfail("https://github.com/dask/distributed/issues/5840")
await asyncio.sleep(0)


@pytest.mark.slow
@requires_zict
@gen_cluster(
nthreads=[("", 1)],
client=True,
worker_kwargs=dict(
# FIXME https://github.com/dask/distributed/issues/5367
# Can't reconfigure the absolute target threshold after the worker
# started, so we're setting it here to something extremely small and then
# increasing the memory_limit dynamically below in order to test the
# spill threshold.
memory_limit=1,
memory_monitor_interval="10ms",
memory_target_fraction=False,
memory_spill_fraction=0.7,
memory_pause_fraction=False,
),
@pytest.mark.parametrize(
"memory_target_fraction,managed,min_spill,max_spill",
[
# no target -> no hysteresis
# Over-report managed memory to test that the automated LRU eviction based on
# target is never triggered
(False, 10e9, 1, 3),
# Under-report managed memory, so that we reach the spill threshold for process
# memory without first reaching the target threshold for managed memory
# target == spill -> no hysteresis
(0.7, 1, 1, 3),
# target < spill -> hysteresis from spill to target
(0.4, 1, 4, 8),
],
)
async def test_spill_no_target_threshold(c, s, a):
"""Test that you can enable the spill threshold while leaving the target threshold
to False
@gen_cluster(nthreads=[], client=True)
async def test_spill_hysteresis(
c, s, memory_target_fraction, managed, min_spill, max_spill
):
"""
memory = psutil.Process().memory_info().rss
a.memory_limit = (memory + 300e6) / 0.7 # 300 MB before we start spilling

class OverReport:
"""Configurable process memory, 10 GB reported managed memory"""

def __init__(self, size):
self.data = "x" * size

def __sizeof__(self):
return int(10e9)

def __reduce__(self):
"""Speed up test by writing very little to disk when spilling"""
return OverReport, (len(self.data),)

f1 = c.submit(OverReport, 0, key="f1")
await wait(f1)
assert set(a.data.memory) == {"f1"}

# 800 MB. Use large chunks to stimulate timely release of process memory.
futures = c.map(OverReport, range(int(100e6), int(100e6) + 8))

while not a.data.disk:
await asyncio.sleep(0.01)
assert "f1" in a.data.disk

# Spilling normally starts at the spill threshold and stops at the target threshold.
# In this special case, it stops as soon as the process memory goes below the spill
# threshold, e.g. without a hysteresis cycle. Test that we didn't instead dump the
# whole data to disk (memory_limit * target = 0)
await assert_not_everything_is_spilled(a)


@pytest.mark.slow
@requires_zict
@gen_cluster(
nthreads=[("", 1)],
client=True,
worker_kwargs=dict(
memory_limit="1 GiB", # See FIXME note in previous test
1. Test that you can enable the spill threshold while leaving the target threshold
to False
2. Test the hysteresis system where, once you reach the spill threshold, the worker
won't stop spilling until the target threshold is reached
"""
# Run the test in a freshly spawned interpreter to ensure a clear memory situation,
# as opposed to the potentially heavily fragmented and unpredictable condition of
# the process used to run all the tests so far
async with Nanny(
s.address,
# Start spilling after ~950 MiB managed memory
# (assuming ~100 MiB unmanaged memory)
memory_limit="1500 MiB",
memory_monitor_interval="10ms",
memory_target_fraction=0.4,
memory_target_fraction=memory_target_fraction,
memory_spill_fraction=0.7,
memory_pause_fraction=False,
),
)
async def test_spill_hysteresis(c, s, a):
memory = psutil.Process().memory_info().rss
a.memory_limit = (memory + 1e9) / 0.7 # Start spilling after 1 GB

# Under-report managed memory, so that we reach the spill threshold for process
# memory without first reaching the target threshold for managed memory
class UnderReport:
def __init__(self):
self.data = "x" * int(100e6) # 100 MB
memory_terminate_fraction=False,
) as nanny:

async def nspilled() -> int:
out = await c.run(lambda dask_worker: len(dask_worker.data.disk))
return out[nanny.worker_address]

nfuts_for_spilling = math.ceil((1500 * 0.7 - s.memory.process / 2**20) / 100)
print(f"Process memory: {s.memory.process / 2**20:.0f} MiB")
print(f"Initial load: {nfuts_for_spilling} * 100 MiB")
assert nfuts_for_spilling > 6

# Add 100 MiB process memory. Spilling must not happen, even when managed=10GB
futures = [
c.submit(
BadSizeof,
process=100 * 2**20,
managed=managed,
pure=False,
)
]
await wait(futures)
await asyncio.sleep(0.2)
assert await nspilled() == 0

# Add another ~800MB process memory. This should start the spilling.
futures += c.map(
BadSizeof,
range(nfuts_for_spilling - 1),
process=100 * 2**20,
managed=managed,
)

def __sizeof__(self):
return 1
# Wait until spilling starts
start = time()
while not await nspilled():
# Different OSs/test environments can get stuck because of slightly
# different memory management algorithms and base unmanaged memory. Add an
# extra 100MB every 0.5s to compensate. Can't do this too fast otherwise a
# very slow CI will fail on the number of elements actually spilled later.
if time() > start + 0.5: # pragma: nocover
print("Did not spill; adding a future")
futures.append(
c.submit(
BadSizeof,
process=100 * 2**20,
managed=managed,
pure=False,
)
)
start = time()
await asyncio.sleep(0.1)

def __reduce__(self):
"""Speed up test by writing very little to disk when spilling"""
return UnderReport, ()
# Wait until spilling stops
prev_n = -1
while prev_n == -1 or time() < start + 0.5:
n = await nspilled()
if n == len(futures):
exc_cls = pytest.xfail if MACOS else AssertionError
raise exc_cls(
"The whole content of the SpillBuffer was spilled to disk; see "
"https://github.com/dask/distributed/issues/5840."
)
if n != prev_n:
prev_n = n
start = time() # We just spilled; reset timer
await asyncio.sleep(0.1)

max_in_memory = 0
futures = []
while not a.data.disk:
futures.append(c.submit(UnderReport, pure=False))
max_in_memory = max(max_in_memory, len(a.data.memory))
await wait(futures)
await asyncio.sleep(0.05)
max_in_memory = max(max_in_memory, len(a.data.memory))

# If there were no hysteresis, we would lose exactly 1 key.
# Note that, for this test to be meaningful, memory must shrink down readily when
# we deallocate Python objects. This is not always the case on Windows and MacOSX;
# on Linux we set MALLOC_TRIM to help in that regard.
# To verify that this test is useful, set target=spill and watch it fail.
while len(a.data.memory) > max_in_memory - 3:
await asyncio.sleep(0.01)
await assert_not_everything_is_spilled(a)
assert min_spill <= await nspilled() <= max_spill


@pytest.mark.slow
Expand Down
13 changes: 4 additions & 9 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3743,7 +3743,6 @@ def check_pause(memory):
"Worker is at %.0f%% memory usage. Start spilling data to disk.",
frac * 100,
)
start = time()
# Implement hysteresis cycle where spilling starts at the spill threshold
# and stops at the target threshold. Normally that here the target threshold
# defines process memory, whereas normally it defines reported managed
Expand All @@ -3768,18 +3767,14 @@ def check_pause(memory):
break
weight = self.data.evict()
if weight == -1:
# Failed to evict: disk full, spill size limit exceeded, or pickle error
# Failed to evict:
# disk full, spill size limit exceeded, or pickle error
break

total += weight
count += 1
# If the current buffer is filled with a lot of small values,
# evicting one at a time is very slow and the worker might
# generate new data faster than it is able to evict. Therefore,
# only pass on control if we spent at least 0.5s evicting
if time() - start > 0.5:
await asyncio.sleep(0)
start = time()
await asyncio.sleep(0)

memory = proc.memory_info().rss
if total > need and memory > target:
# Issue a GC to ensure that the evicted data is actually
Expand Down