Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow memory monitor to evict data more aggressively #3424

Merged
merged 1 commit into from
Jan 30, 2020

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Jan 29, 2020

I am running heavy shuffle operations (dask.DataFrame.groupby.apply) and am observing a few issues regarding spill to disk

  1. My workers are often running out of memory. Investigating the state of the workers before that happens, I see a lot of small-ish tasks which are suitable for spill-to-disk but spill-to-disk only helps if I set really aggressive thresholds and even then it seems the worker can simply not keep up.
  2. Even though my workers are killed by the nanny, I rarely ever trigger the threshold for pause.

Looking at the code, what I think is happening is:

The memory monitor hits the spill threshold and starts evicting one piece of data at a time. In my scenario there are usually 100-200 items in a given worker, i.e. there is a lot of potential to spill. The size of the data pieces are varying from KBs up to hundreds of MBs. The memory monitor loops until we hit our target threshold but returns the control to the event loop in every individual loop iteration. Especially since the eviction is based on a LRU mechanism instead of a weight based approach this is suboptimal since it requires many loops to get the data evicted. In the meantime, either new data is fetched or the processing thread just continues to generate more data, i.e. I am rarely able to spill as fast as I am producing new data.
In this case it is likely that the inner loop of the memory monitor is actually never left, i.e. the pause is never triggered and the worker continues to compute and fetch dependencies which ultimately may cause the worker to fail.

Therefore I suggest two fixes here:

  1. Check for the pause threshold every time we finish one for-loop cycle (i.e. after every eviction)
  2. Evict more aggressively, i.e. allow the worker to evict data for a continuous amount of time without passing on control to the event loop. This allows to spill many small items or individually few big items. Irregardless, this gives the worker more control of the memory.

Alternatives to 1. : Instead of checking every for loop we may check only every time we return control to the event loop
Alternative to 2. : Instead of using a simple LRU mechanism, we could employ something which also takes the size of the individual elements into account, or only the size. Not sure what we should aim for here.

Open for further suggestions. In my situations this fix already improves stability quite a lot.

@mrocklin
Copy link
Member

Check for the pause threshold every time we finish one for-loop cycle (i.e. after every eviction)

I'm not sure that this is currently happening. I think that the check_pause function depends on the value frac, which is computed at the beginning of the function with a memory_info().rss call. I think that we don't update this from within the for loop (which might be good, because calling memory_info() can be a little slow).

Evict more aggressively, i.e. allow the worker to evict data for a continuous amount of time without passing on control to the event loop. This allows to spill many small items or individually few big items. Irregardless, this gives the worker more control of the memory.

This makes sense to me, especially for many small pieces of data.

Open for further suggestions. In my situations this fix already improves stability quite a lot.

I'm very glad to hear this! And thank you for finding and fixing this stability issue.

If the 500ms wait time between awaits is helping then I'm happy merging that in now.

If you have the time, I would appreciate it if you would look over the check_pause function and verify that your understanding is correct. I went through it briefly, and could easily be misunderstanding something.

@fjetter fjetter force-pushed the memory_monitor_improvements branch from 73b8744 to a7e7c40 Compare January 30, 2020 08:53
@fjetter
Copy link
Member Author

fjetter commented Jan 30, 2020

You are right, of course, about the check_pause. I noticed that we actually check the memory every loop and for the log messages. I removed the call in the log messages. This will probably not change much but it still feels unnecessary. The check in the loop itself is required to infer if we can actually stop spilling.

On another note:
Another source of data is the ensure_communicating method where we fetch dependencies for the tasks about to be computed. I'm wondering if this should also be stopped when worker is paused (Haven't tested it, just want to hear an opinion).

@mrocklin
Copy link
Member

Another source of data is the ensure_communicating method where we fetch dependencies for the tasks about to be computed. I'm wondering if this should also be stopped when worker is paused (Haven't tested it, just want to hear an opinion).

That makes sense to me. I suggest that if we do this then we do it in another PR though.

@fjetter
Copy link
Member Author

fjetter commented Jan 30, 2020

That makes sense to me. I suggest that if we do this then we do it in another PR though.

Sure, I'll run a few tests and open another PR

@mrocklin mrocklin merged commit 3e7bbbd into dask:master Jan 30, 2020
@mrocklin
Copy link
Member

OK. Merging this in. Thank you as always @fjetter !

bnaul pushed a commit to replicahq/distributed that referenced this pull request Feb 10, 2020
@crusaderky
Copy link
Collaborator

From my latest convo with @fjetter:
what we now suspect is actually happening is that:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants