Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor accessor + flexible indexes + Dask support #18

Merged
merged 12 commits into from
Dec 9, 2020

Conversation

benbovy
Copy link
Member

@benbovy benbovy commented Oct 26, 2020

This PR will eventually be quite big (sorry, at this stage I think it'll be more productive overall than splitting things in many PRs).

Two goals:

  • Flexible indexes, i.e., easy to extend xoak to other indexes, reusing the same API for indexing Datasets / DataArrays
  • Fully lazy, Dask enabled indexing for both for index builds and queries, aligned with index/query point coordinates chunks

TODO:

  • add docstrings
  • add some tests
  • update example notebook

Flexible indexes

Registering a new custom index in xoak can be easily done with the help of a small adapter class that must implement the build and query methods, e.g.,

import xoak
from mypackage import MyIndex

@xoak.register_index('my_index')
class MyIndexAdapter(xoak.IndexAdapter):

    def build(self, points):
        # must return an instance of the wrapped index
        return MyIndex(points)

    def query(self, my_index, points):
        # must return a (distances, positions) tuple of numpy arrays
        return my_index.query(points)

Any option to pass to the underlying index construction should be added as argument in the adapter class' __init__ method (we could address later how query options are handled).

In the example above, my_index is registered in xoak's available indexes. It can be selected like this:

Dataset.xoak.set_index(['lat', 'lon'], 'my_index')

It's also possible to directly provide the adapter class (useful when one does not want to register an index adapter):

Dataset.xoak.set_index(['lat', 'lon'], MyIndexAdapter)

xoak.indexes returns a mapping of all available indexes. As an alternative to the decorator above, it can also be used to register index adapters, e.g.,

xoak.indexes.register('my_index', MyIndexAdapter)

Dask support

This PR implements dask-enabled index build and/or query that is independent of the underlying index. It handles chunked coordinate variables for either or both index / query points.

For chunked index coordinates, a forest of index trees is built (one tree per chunk).

A query is executed in two stages:

  • 1st "map" stage:
    • map the index adapter class query method for each query array chunk and each tree of the forest.
    • concatenate all distances/positions results into two dask arrays of shape (n_points, n_indexes), respectively.
  • 2nd "reduce" stage:
    • brute force lookup: argmin is applied on the distances array over the indexes dimension (columns)

Advantages of this approach:

  • It's simple, efficient and should scale well
  • It can be fully lazy, including the index construction, which means that out-of-core selection is possible in theory.
  • If we need to execute a lot of queries on the same dataset, it is possible to cache the indexes in memory using dask.persist(). There's an option for that (enabled by default):
Dataset.xoak.set_index(['lat', 'lon'], MyIndexAdapter, persist=True)
  • Indexes are not partitioned following the spatial distribution of the points but rather using the coordinate chunks. This could make queries less optimized, but this reduces the amount of data transferred between dask workers.

Potential caveats:

  • Index tree structures can be tricky to serialize/deserialize. It depends on their implementation. Serialization may be not very efficient (it may sometimes require a full tree rebuild). I still need to figure out if we can avoid that with dask (i.e., preferably move the query data to where the indexes live).
  • Chunk sizes of index and/or query coordinates have a great impact on the overall performance and must be set carefully. It greatly depends on which index is used. It unlikely corresponds to optimal chunk sizes for I/O perfomance.

Other changes

  • I removed the transform argument in Dataset.xoak.set_index and the tolerance argument in Dataset.xoak.sel. Supported features may greatly vary from one index to another, so I'd suggest that most logic should be implemented in index adapter classes (with option forwarding), instead of trying to extend an API that is common to all indexes.

  • Dataset.xoak.sel triggers the computation of the query dask graph (if any), because xarray.Dataset.sel does not (yet) support dask arrays as indexers.

  • I think we should leave Dataset.xoak.sel with as few features as possible and on par with xarray.Dataset.sel. It would be great to leverage all the features of tree-based indexes (e.g., return distances, select k-nearest neighbors, lazy selection, set masks or max distances, etc.), but I think that the xoak accessor should provide other methods for those special cases.

Also add sklearn BallTree wrappers.
- Removed "tolerance"
- Removed "transform"
- Plugin IndexWrapper
- Enable dask-based index builds
@benbovy benbovy marked this pull request as draft October 26, 2020 16:58
@benbovy benbovy changed the title Refactor accessor + flexible indexes Refactor accessor + flexible indexes + Dask support Oct 26, 2020
@benbovy
Copy link
Member Author

benbovy commented Oct 29, 2020

I think it's ready for review. @willirath @koldunovn if you want to take a look (I know, it's quite big).

I don't know why lint tests are failing here (I run black on those files). Tests are running fine locally.

We still need some more tests for the accessor. That can be done in #10 after merging this (probably a couple of merge conflicts to solve).

I updated the example notebook.

There's one performance issue when index coordinates are chunked (I think it's a xarray issue), but we can go ahead and address this later IMO.

@benbovy
Copy link
Member Author

benbovy commented Oct 30, 2020

There's one performance issue when index coordinates are chunked (I think it's a xarray issue)

Opened pydata/xarray#4555

@benbovy
Copy link
Member Author

benbovy commented Oct 30, 2020

Let's also track pydata/xarray#2511 so that eventually we won't have to trigger computation of the dask graph in .xoak.sel().

Copy link
Contributor

@willirath willirath left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. I especially love how easy it is to add any indexer: https://github.com/ESM-VFC/xoak/pull/18/files#diff-436fac86313153a2042128086fef47f4996ea933c8d3697ffdbdbd68d50d3146

I'll give it a test with some bigger Dask based data and report back.

@benbovy
Copy link
Member Author

benbovy commented Nov 3, 2020

I've done some tests on a bigger server with the S2 point index, still using randomly located points.

Settings:

  • Dask local cluster of 60 workers with 1 thread each (I have issues with multi-threads)
  • 60M indexed points (60 chunks of 1M points)
  • 9M query points (10 chunks of 900k points)

Results:

  • building the 60 indexes and persisting them in memory takes 2 seconds
  • querying the points takes 50 seconds

Caveats:

The results above were obtained after I spent a while on tweaking chunks, dask cluster and dask configuration (e.g., disable work stealing). It turns out to be very tricky in reality. Scaling is also highly limited by memory.

There's one major issue here: during the query, the indexes get replicated on a growing number of dask workers, which causes memory blow-up and significant drop in performance due to index data serialization (which, in the case of the S2, means rebuilding indexes from scratch!). I suspect that the dask's scheduler has a poor idea on the memory footprint of those indexes (persisted in memory). This may explain why dask's heuristics to assign tasks to the workers miserably fails in this case.

I need to figure out:

  • If we can give to dask a better idea on the memory footprint of each index
  • If / how best we can force dask to submit the query tasks to the workers where the index lives in memory, i.e., move query point data between workers rather than indexes objects (assuming that query point data is generally smaller than index point data).

@benbovy
Copy link
Member Author

benbovy commented Nov 4, 2020

If we can give to dask a better idea on the memory footprint of each index

Turns out we just need to define the __sizeof__ method for the index wrapper classes. It does yield better performance, but it still doesn't solve all the issues. We could imagine to "fool" dask by returning a very large size and hope that indexes won't be replicated too much, but that's not very elegant.

if / how best we can force dask to submit the query tasks to the workers where the index lives in memory

It's pretty straightforward to do this using client.compute(..., workers=...). It doesn't play well with resilience, though (it may freeze the computation if workers die or indexes are deleted for some reason). To improve stability we could use client.replicate() and client.rebalance(). It works quite well, but all those manual tuning steps require experience and attention.


Despite those tricks, I still sometimes get workers restarted or indexes deleted for unknown reasons. I don't know, maybe I use too large amounts of data for the cluster I setup.

With this approach, I'm afraid there's no way to completely avoid large amounts of data being transferred between workers, so scalability is still limited.

On the positive side, we do really get very nice speed-ups with decent amounts of data (millions of points on my laptop)!

@benbovy
Copy link
Member Author

benbovy commented Nov 5, 2020

Broadcasting the query point chunks to all workers may greatly help too:

query_points = ... # dask array

query_points = query_points.persist()

client.replicate(query_points)

This could be the best thing to do actually, since in most cases the whole query data should easily fit in each worker's memory.

Now this is looking good:

Screenshot 2020-11-05 at 09 13 38

@benbovy benbovy marked this pull request as ready for review November 5, 2020 08:48
@benbovy
Copy link
Member Author

benbovy commented Dec 9, 2020

Ok let's merge this. Tests failed because of black (I suspect a version mismatch), so no harm here we can take care of this later.

@benbovy benbovy merged commit f240569 into xarray-contrib:master Dec 9, 2020
@benbovy benbovy mentioned this pull request Dec 15, 2020
@benbovy benbovy deleted the refactor-indexes-dask branch August 4, 2021 10:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants