Skip to content

Commit

Permalink
Allow memory monitor to evict data more aggressively (#3424)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Jan 30, 2020
1 parent 8eadf5e commit 3e7bbbd
Showing 1 changed file with 37 additions and 22 deletions.
59 changes: 37 additions & 22 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2584,36 +2584,44 @@ async def memory_monitor(self):
memory = proc.memory_info().rss
frac = memory / self.memory_limit

# Pause worker threads if above 80% memory use
if self.memory_pause_fraction and frac > self.memory_pause_fraction:
# Try to free some memory while in paused state
self._throttled_gc.collect()
if not self.paused:
def check_pause(memory):
frac = memory / self.memory_limit
# Pause worker threads if above 80% memory use
if self.memory_pause_fraction and frac > self.memory_pause_fraction:
# Try to free some memory while in paused state
self._throttled_gc.collect()
if not self.paused:
logger.warning(
"Worker is at %d%% memory usage. Pausing worker. "
"Process memory: %s -- Worker memory limit: %s",
int(frac * 100),
format_bytes(memory),
format_bytes(self.memory_limit)
if self.memory_limit is not None
else "None",
)
self.paused = True
elif self.paused:
logger.warning(
"Worker is at %d%% memory usage. Pausing worker. "
"Worker is at %d%% memory usage. Resuming worker. "
"Process memory: %s -- Worker memory limit: %s",
int(frac * 100),
format_bytes(proc.memory_info().rss),
format_bytes(memory),
format_bytes(self.memory_limit)
if self.memory_limit is not None
else "None",
)
self.paused = True
elif self.paused:
logger.warning(
"Worker is at %d%% memory usage. Resuming worker. "
"Process memory: %s -- Worker memory limit: %s",
int(frac * 100),
format_bytes(proc.memory_info().rss),
format_bytes(self.memory_limit)
if self.memory_limit is not None
else "None",
)
self.paused = False
self.ensure_computing()
self.paused = False
self.ensure_computing()

check_pause(memory)
# Dump data to disk if above 70%
if self.memory_spill_fraction and frac > self.memory_spill_fraction:
logger.debug(
"Worker is at %d%% memory usage. Start spilling data to disk.",
int(frac * 100),
)
start = time()
target = self.memory_limit * self.memory_target_fraction
count = 0
need = memory - target
Expand All @@ -2624,7 +2632,7 @@ async def memory_monitor(self):
"to store to disk. Perhaps some other process "
"is leaking memory? Process memory: %s -- "
"Worker memory limit: %s",
format_bytes(proc.memory_info().rss),
format_bytes(memory),
format_bytes(self.memory_limit)
if self.memory_limit is not None
else "None",
Expand All @@ -2634,14 +2642,21 @@ async def memory_monitor(self):
del k, v
total += weight
count += 1
await asyncio.sleep(0)
# If the current buffer is filled with a lot of small values,
# evicting one at a time is very slow and the worker might
# generate new data faster than it is able to evict. Therefore,
# only pass on control if we spent at least 0.5s evicting
if time() - start > 0.5:
await asyncio.sleep(0)
start = time()
memory = proc.memory_info().rss
if total > need and memory > target:
# Issue a GC to ensure that the evicted data is actually
# freed from memory and taken into account by the monitor
# before trying to evict even more data.
self._throttled_gc.collect()
memory = proc.memory_info().rss
check_pause(memory)
if count:
logger.debug(
"Moved %d pieces of data data and %s to disk",
Expand Down

0 comments on commit 3e7bbbd

Please sign in to comment.