Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory errors on distributed dask cluster #668

Closed
jacobtomlinson opened this issue Dec 6, 2018 · 16 comments · Fixed by #717
Closed

Memory errors on distributed dask cluster #668

jacobtomlinson opened this issue Dec 6, 2018 · 16 comments · Fixed by #717
Assignees

Comments

@jacobtomlinson
Copy link

Description

I have a persisted dask dataframe which is larger than the amount of memory on my notebook server and any of the individual workers. The structure is x, y, z lidar data.

When trying to plot with datashader it seems to attempt to transfer the whole dataframe to the notebook when aggregating before plotting.

ddf = client.persist(dd.read_parquet('Some 20GB dataset'))
cvs = ds.Canvas(900, 525)
agg = cvs.points(ddf, 'x', 'y', agg=ds.mean('z'))

This results in 20GB of data being transferred to my notebook (and it gets killed by the OOM killer as I only have 16GB of RAM).

Your environment

Datashader version: 0.6.8
Dask version: 0.20.0
Distributed version: 1.24.0

@niallrobinson
Copy link

@jbednar is this just the way it works or are we doing something wrong?

@jbednar
Copy link
Member

jbednar commented Dec 19, 2018

From that description, I'm not sure exactly what you mean. In my own work, I've previously used .persist() only on a single machine, when I want the entire dataframe to be put into memory on that one machine (and kept there). Here you have a distributed computation, and though I haven't tested it, I would have expected (like you) that each chunk would be persisted on the individual worker nodes, without the entire dataframe being instantiated on any one process. It sounds like that's not what you're seeing, but I'm not sure what you mean by the "notebook" in this case; do you mean the Jupyter kernel process, which is presumably where your dask distributed scheduler is running?

@mrocklin
Copy link

Persist makes sure that chunks of data are around on some worker. That data is usually not on the main machine.

Then I would expect datashader to do some complex groupby aggregation and calls compute.

df.groupby([df.x.round(...), df.y.round(...)]).count().compute()

When you call compute you get a pandas dataframe on the client. I would expect this to be less than the full 20GB.

Sometimes people have calls like df = df.compute() which brings distributed data local. I wouldn't expect that from the datashader codebase though.

If I were to diagnose this I would probably look at the code and see any place where compute/persist was being called. I might also put in a bit of logging information and sleeps so that I could try to narrow down where in the datashader pipeline things are pouring data into the client/jupyter process.

@jacobtomlinson
Copy link
Author

Thanks for your responses both.

@jbednar yes you're right I mean kernel which us where my scheduler is (using dask-kubernetes).

@mrocklin The end result of the processing is an xarray DataArray which I would imagine could remain distributed after the groupby.

@mrocklin
Copy link

@mrocklin The end result of the processing is an xarray DataArray which I would imagine could remain distributed after the groupby.

Groupby aggregations always end up in a single partition. I would expect the output of datashader's process to have 900 * 525 rows, so I'm not sure I undertand this.

@jacobtomlinson
Copy link
Author

jacobtomlinson commented Dec 20, 2018

Groupby aggregations always end up in a single partition.

Fair enough. But as you say I would imagine this partition to be 900 * 525. Unless the reduction (mean, count, etc) is performed after the data is gathered into the partition? This could result in the 900 * 525 dataframe having all of the data points in each row.

@mrocklin
Copy link

groupby-aggregations are computed by doing groupby aggregations on the partitions, then merging a few, doing more groupby-aggreations on those, and so on in a tree reduction until we get to a final result. There is never much memory in any particular partition (assuming that the number of groups is managable)

As an example, we accomplish a groupby-mean by doing a groupby-sum and groupby-count on each partition, then doing a groupby-sum on both of those until we get down to one, then dividing the result on the final partition.

However, datashader does different things than dask.dataframe. I'm not as familiar with their algorithms, but I suspect that they do something similar.

@jonmmease jonmmease self-assigned this Jan 10, 2019
@jonmmease
Copy link
Collaborator

Hi @jacobtomlinson,

Wanted to let you know that I'm planning to take a look at this, as it's definitely an important usecase (and it's something that Datashader+Dask should be able to handle). But unfortunately it probably won't be until early February that I'll have a compute/storage environment setup to be able to reproduce what you're seeing.

@jacobtomlinson
Copy link
Author

@jonmmease thanks for looking into this! If you want access to our JupyterHub/Dask environment for testing and reproduction then let me know and we can get you an account on there.

@jonmmease
Copy link
Collaborator

Thanks for the offer! Once I start digging in I'll let you know if it looks like that would help nail things down.

@jonmmease
Copy link
Collaborator

In working on another project, I just realized that Datashader's glyph autorange logic for Dask calls numpy's nanmin/nanmax functions.

https://github.com/pyviz/datashader/blob/9a29d72798a50f65b8ba7764053fd787a1a388ea/datashader/glyphs.py#L71-L76

I stepped into these functions in the debugger and it looks like numpy handles these functions by converting the entire dask array into an in-memory numpy array before computing the min/max. This is something we can improve by writing a custom map_partitions function that computes the nanmin and nanmax per partition and then aggregates them.

@jacobtomlinson, in your example above it looks like you're not specifying x_range/y_range to the Canvas constructor, which would trigger this auto-ranging logic. When you have a chance, I think it would be worth trying your example with an explicit x_range/y_range and see if there is any improvement in the local memory usage. E.g. something like:

ddf = client.persist(dd.read_parquet('Some 20GB dataset'))

x0 = ddf['x'].min().compute()
x1 = ddf['x'].max().compute()
y0 = ddf['y'].min().compute()
y1 = ddf['y'].max().compute()

cvs = ds.Canvas(900, 525, x_range=(x0, x1), y_range=(y0, y1))
agg = cvs.points(ddf, 'x', 'y', agg=ds.mean('z'))

@jacobtomlinson
Copy link
Author

I'll give this a go!

@jacobtomlinson
Copy link
Author

Sadly I'm still getting the same issue.

@jonmmease
Copy link
Collaborator

Thanks for giving it a try. My hunch is that this isn't the only place where this kind of thing is happening.

@jacobtomlinson
Copy link
Author

Yes this is my feeling too. There will be somewhere (or multiple places) where the distributed array is accidentally pulled together.

@mrocklin
Copy link

One solution to this would be to implement NEP-18 for dask arrays. In that case all operations like np.nanmin would dispatch to the dask.array version. It's a bit of a sledgehammer approach, but could be fun.

The equivalent PR for cupy is here: cupy/cupy#1650
Presumably the Dask array version would look very similar.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants