AnnoyED is a streamified adoption of the Annoy algorithm [1, 2] by Erik Bernhardsson to retrieve approximate nearest neighbors. It continuously builds Annoy Indices on (possibly endless) data streams instead of on self-contained datasets only. The built indices can be queried during runtime (independent from the index building); it is so guaranteed that query results always contain the most current data. This project is implemented in Java, uses Kafka Streams, and was developed in the master’s seminar “Machine Learning for Data Streams” at Hasso Plattner Institute in 2019/20.
Make sure to install the following:
- Java JDK (v8 or newer) (↗ instructions)
- Docker and Docker-Compose (↗ instructions)
- Python3 and PIP3, optional for evaluation framework (↗ instructions)
Then:
- Clone repository via
git clone git@github.com:jonashering/annoyED.git
. - Start the Docker daemon.
- Run
docker-compose up
. This will start and setup Zookeeper and Kafka. - Execute
./gradlew run
to start the annoyED application.
On startup, the application exposes three REST endpoints on localhost:5000
. The relevant one, in the beginning, is /params
:
/params
: To configure the application, make a POST request with the request body<Number of Trees>;<Expected Dataset Size>;<Distance Metric ID>
to this endpoint. The store creates the trees and allocates the space accordingly (use 0 if size is unknown). We currently support 2 distance metrics: Euclidean (ID: 0) and Angular (ID: 1). An example body would be10;0;0
. We would get 10 trees, no preallocated memory and the euclidean distance metric. Be aware that every POST request resets the whole storage. This means that all data that was loaded into the store will be discarded.
Our Kafka Streams application uses two topics: source-topic
for input and sink-topic
for output. JSON serialized messages are written and read by our app. Messages to the source-topic
should have the following format:
{ 'datapointID': 0,
'vector': [0.3, 0.7, 255, ...],
'persist': true,
'write': false,
'k': 10 }
Use the flags persist=True
to add a point to the index and write=True
to retrieve its nearest neighbors and write them to the sink-topic
as explained in detail here. The application then outputs JSON serialized NearestNeighbors object to the sink-topic
in the following format; list contains the datapointID
s of the k nearest neighbors sorted by distance:
{ 'list': [1, 5, 4, 3] }
-
/query
: After having added some points to your index, start querying nearest neighbors for a new point by making a POST request with a JSON serialized Datapoint as request body (as described above) to this endpoint. The result will be a JSON serialized NearestNeighbors object with the ids of the k nearest neighbor points. -
/trees
: To inspect the built indices, send a GET request to this endpoint. The response will be a JSON serialized IndexTree. Be aware that the returned object can be very large. We only recommend using this endpoint for testing purposes with few data.
Erik Bernhardsson, the creator of Annoy, also developed a benchmarking framework called ann-benchmarks [3], which we use to evaluate our implementation. To run the benchmarks on AnnoyED, use our fork of ann-benchmarks found here which includes a custom AnnoyED wrapper, i.e. datapoint producer, as well as the required configuration files. Then, follow the README there. In short this means:
git clone git@github.com:MariusDanner/ann-benchmarks.git # clone repository
cd ann-benchmarks
pip install -r requirements.txt
python run.py --local --dataset [dataset_id]# run the benchmark tool
python plot.py # show and plot the results
Finding similar records in datasets can facilitate solutions for many interesting use-cases. For example, recommendation engines for music services: We can recommend new interesting songs to listeners based on what other similar listeners previously listened to. Since both listeners are somewhat similar, it is not unlikely that they will perceive these recommendations as reasonably customized and meaningful.
K-NN formalizes searching similar points in a dataset as follows: Given a dataset of points in a vector space (e.g. a three-dimensional space), an arbitrary distance function (e.g. Manhattan distance), for each point p in the dataset we want to retrieve the k points that are closest to p based on the selected distance function, i.e. the k nearest neighbors [8]. Usually, these neighbors are then used for classification or regression tasks.
A naive solution on a self-contained dataset is to compute distances between all pairs of points in the dataset and infer the k nearest neighbors from these results. This naive approach has quadratic time complexity wrt. the number of points in the dataset. While smart optimizations to this naive solution exist, the time complexity of k nearest neighbor search algorithms remains a bottleneck [7].
However for many use-cases, determining the exact k nearest neighbors is not a crucial requirement. For example when using k-NN for recommendation engines, it is usually reasonably good to compromise on quality, i.e. allow leaving out some true nearest neighbors, if it leads to a faster query performance. This is where approximate k-NN comes into place. Approximate nearest neighbor search is a relaxed definition of k-NN. Given a relative error of a point p is considered an approximate nearest neighbor of point q, if , where p* is a true nearest neighbor of q. Thus, p is within relative error of the true nearest neighbor [7].
Approximate k-NN approaches were firstly developed for static datasets. When the underlying data changes, this requires however recalculating the entire model. This is costly and since models are then trained on snapshots of the dataset, queries might be answered based on deprecated data. To tackle this issue, we adopted the popular approximate k-NN approach Annoy [1, 2] and transformed it into a streamified approach.
Annoy [1, 2] uses a spatial indexing strategy with binary trees to store and retrieve points efficiently. Generally, the root node as well as every inner node in this tree represents a split, while every leaf node is a sink that holds a fixed number of points.
Building Annoy Index: An index tree is built in a top-down procedure: Consider a set of points, i.e. your dataset, two points are randomly picked from this set. Between these two points, a bisecting hyperplane is constructed that is equidistant to both points. This split constitutes the root node in our index tree. It has a left and a right child node. Each point in the dataset is assigned to either one of the children nodes based on which side of the drawn hyperplane it is located. This process of splitting the dataset into smaller and smaller chunks is recursively applied to every new child node until we have an index tree in which each leaf node holds at most a fixed number of points. The result of this process can be seen in the illustration above. In result, the constructed index trees with its split nodes and hyperplanes partition the space in which our points are located into small subspaces that are roughly equally populated.
Querying Annoy Index: Consider a query point for which you want to determine its k approximate nearest neighbors. We start with the root node of our previously built index tree and check on which side of its hyperplane our query point is located. Based on that result, we proceed to traverse the tree with either the left or right child node. We traverse the tree in this fashion until we reach a leaf node. All points that this leaf node holds are considered nearest neighbor candidates. We proceed by calculating the distance to these points and the query point. The k points with the lowest distance are the resulting k nearest neighbors.
Using a forest instead of a single tree: Our simplified explanation makes it easy to spot a major flaw in this concept: If we are unfortunate, splits could easily separate close points into different partitions, e.g. if they are very close to a hyperplane. For that reason, instead of constructing a single index tree, we construct a forest of n index trees that are all constructed randomly, i.e. each is different from one another. With every query, we search all index trees in this forest. By this, the problem of missing nearest neighbors due to unfortunate splits is reduced.
In the original implementation of Annoy, the built index is immutable. This results in it being impractical for an approach where the data originates from a continuous data stream rather than a static dataset. However, since the index trees constructed when building the index are simply binary trees, i.e. only leaves hold data and they do not need to be balanced, we can make them mutable and thus suitable for our approach with few modifications:
While the querying procedure remains unchanged, we add the following modification to constructing the index trees: Points can be added to an existing index tree at any time, the correct leaf node to hold the new point is found by traversing the index tree similarly to the querying process, i.e. until the corresponding subspace is found. Each leaf node has a fixed capacity. Until this capacity is reached, we can add new points to it. When it is reached, a new spit is introduced by inserting a new hyperplane in its space and partitioning the points into two leaves accordingly.
Step 1: Traverse IndexTree to the corresponding leaf |
Step 2: Insert new point at that leaf, split leaf if it reaches capacity (in this example: capacity=8) |
---|
Apart from the Kafka-relevant components, we defined the following classes to store and retrieve approximate nearest neighbors:
Datapoint
s have an id which uniquely identifies them and is used as a key to store the Datapoint
in a global lookup table. The data vector holds the position of the datapoint in the vector space. The persist flag tells the NeighborProcessor
whether or not the datapoint should be written into the index. The write flag tells the NeighborProcessor
if the nearest neighbors should be retrieved and written to the sink-topic
. The k variable is only used if the write flag is set and tells the IndexStore
how many nearest neighbors should be retrieved.
A Datapoint
is written to the Kafka Topic source-topic
through our Producer
or an external application, i.e. the evaluation framework, to which the NeighborProcessor
is subscribed. Depending on if the Datapoint
should be written to the index and/or its nearest neighbors should be retrieved, it is proceeded as follows:
Adding a Datapoint to the Tree Index: The NeighborProcessor
calls the write method of the IndexStore
. If the distance metric is angular, the vector of the Datapoint
is converted to a unit vector as a preprocessing step. After this, the Datapoint
gets inserted into a lookup table (HashMap) which maps the unique datapointID to the Datapoint
. The Datapoint
is now being inserted into each one of the IndexTree
s. If the Datapoint
is the first point to be inserted into the tree, the parameter _ k, which determines the maximum capacity of a leaf, is set to the number of dimensions of the datapoint rounded up to the next hundred. Now, the correct IndexNode
of each IndexTree
is found by traversing the nodes of the tree starting by its root node and continuing with either its leftChild or rightChild based on which side of the IndexSplit
the Datapoint
is located. We stop when we find a leaf, i.e. a node without children.
The datapointID is then added to the data list of the node. If the node data contains less than _ k elements now, the insert operation is finished. Otherwise, a left and a right child node and an IndexSplit
are created. First, two Datapoint
s are selected to create the split randomly (or alternatively by an approximate 2-Means procedure). Then, a hyperplane (equidistant to both points) is constructed and persisted in the IndexSplit
. After this, all Datapoint
s of the node are moved either to the leftChild or the rightChild depending on which side of the hyperplane they are located.
Querying the Nearest Neighbors of a Datapoint: To query the k neighbors of the Datapoint
, candidates from all trees are selected. First, a HashMap<Integer, Double>
which maps the datapointID
to the distance to the query point is created. It acts as a cache so that distances between the same pairs of Datapoints
do not need to be calculated redundantly. In each IndexTree
, the correct leaf node is determined in the procedure as on insert. Then, for each Datapoint
in the leaf node, the distance to the query point is determined. This is done by looking up the datapointID in the previously created cache or calculating the distance and caching it. Then, all Datapoint
s of the leaf node are sorted by distance to the query point and the closest k Datapoint
s are returned. In the last step, all returned Datapoint
s (from all IndexTrees
) are merged in a custom merge-sort-like procedure until the top k nearest neighbor Datapoint
s have been determined. These are then returned and written to the sink-topic
as NeighborCandidates
.
The ann-benchmarks framework [3] provides access to multiple datasets and quality measures to evaluate approximate nearest neighbor approaches. We used it with a custom annoyED wrapper and measured recall of the k nearest neighbor result as (i.e. recall is equivalent to precision in this use case) as well as the number of processed queries per second as QPS (this excludes model training) on these reference datasets:
Dataset | Dimensions | Train size | Test size | Neighbors | Distance |
---|---|---|---|---|---|
MNIST [4] | 784 | 60,000 | 10,000 | 100 | Euclidean |
Fashion-MNIST [6] | 784 | 60,000 | 10,000 | 100 | Euclidean |
NYTimes (Bag of Words) [5] | 256 | 290,000 | 10,000 | 100 | Angular |
In the process of developing annoyED, we started with a straightforward implementation of our approach and later implemented a number of small optimizations to increase both quality and throughput. We created benchmarks for these changes with the MNIST dataset [4], Euclidean distance and k = 10.
Baseline: Already with our baseline (No Caching + Random), where we select split candidates randomly and otherwise proceed as described in the previous section but leabing out the aforementioned query caching, we achieve a precision of up to 0.87 at 31 QPS with 5 index trees. As expected, with more trees our model can answer queries with higher recall, i.e. unfavorable partitions in one index tree are compensated by the other trees, but however at the cost of slower throughput due to the increased complexity.
2-Means Split Candidate Selection: Randomly selecting the points between which the splitting hyperplane is constructed in a split can lead to unbalanced splits, e.g. if a selected point is an outlier. This will then lead to an unbalanced and unnecessarily deep tree structure in our index. To approach this problem, we added a k-means approximation with two centroids to create an optimally splitting hyperplane (look here for how it works in detail). With this optimization, we achieve a higher precision of up to 0.92 while at the same time maintaining the throughput of 31 QPS with 5 trees. We observe an increased QPS for less trees as well. Splitting the points in a more balanced way and thus creating more quality index trees is therefore a meaningful addition.
However: When later running the experiments on the NYTimes dataset, we discovered that the 2-means algorithm might decrease the performance. Since the capacity of the index tree leaves depends on the dimensionality of points, low-dimensional datasets trigger splitting more often. The costly 2-means, as well as the large dataset size of NYTimes, makes the index building slower, we thus made it optional.
Caching intermediate distance results: A quick recap of our implementation: When searching nearest neighbors, each index tree is traversed to the correct leaf node and from its set of points, the k closest points are forwarded as neighbor candidates where they are then merged to the true k results. Even though every index tree randomly partitions its points differently, it is likely that there is a common subset of neighbor candidates, i.e. the points closest, among all index trees. In our baseline implementation, this led to distances being calculated redundantly for every index tree. As an optimization, we cache the distances and so avoid duplicate computation. This does not lead to quality improvement, however, it improves the throughput to 40 QPS with 5 trees. In fact, we can now train more trees and while maintaining the same throughput we previously had with fewer trees, e.g. instead of 4 trees we now can have 5 at 40 QPS increasing the recall by 0.04.
Parallelization of Distance Calculation: We also experimented with parallelizing the distance calculation when retrieving points from the index. Our implementation collected the set of neighbor candidates from all index trees and then distributed the distance calculation to all available cores using the Java interface list.parallelStream().map(_ -> _)
. This did, unfortunately, decrease the speed. However, our testing setup is a machine with 4 (virtual) cores, so on a better-powered machine this might change.
Additionally, we tested how well our implementation works with different settings of k. We observe, that the throughput remains relatively equal for different k. We expected this behavior, since sorting all leaf candidates is equally complex for every k, only the later merge step benefits from smaller k; however, its duration is insignificant. Also, that lower k result in higher recall results is not surprising; due to the partitioning process very close points likely end up in the same partition, points further distant are more likely to be cut off to other partitions.
With our implementation working reasonably well on MNIST with euclidean distance, we also tested it on the similar Fashion-MNIST dataset (euclidean distance) and on the NYTimes dataset (angular distance) with k = 10. We generally observe that with a higher number of index trees, the recall increases while the QPS decreases.
The NYTimes dataset has 5 times the points as MNIST but only 256 dimensions. This results in a lower _k (than MNIST) and thus having less candidates per tree. This means that every tree holds a smaller fraction of available points (maximum 300/290000 = 0.001 (while for MNIST, each tree holds a maximum fraction of 0.013) so retrieving nearest neighbors is more difficult which is also confirmed by the recall results [3] of other algorithms. Fashion-MNIST on the other hand performs very similar to MNIST. The reason for this is it being equal to MNIST in terms of dimensionality, size and the used distance measure. The insignificant differences are likely due to chance at runtime.
Our project uses the core ideas from Annoy [1, 2], so we were interested in how well the different implementations compare on MNIST. Both implementations achieve similar high recall values; with 15 trees annoyED achieves a recall of 0.97, however at the price of few QPS of 14.6. Annoy on the other hand achieves an recall of 0.99 with 83.3 QPS. Another example: to get a recall of 0.79 annoyED has 30.2 QPS while Annoy achieves 3940 QPS.
To consider is the following: Annoy is a highly optimized library written in C++, it uses AVX [9] where possible to accelerate for-loops using SIMD operations. Also, Annoy does not allow modifying built indices which allows further optimizations. Plus, Annoy is directly accessed through a C++/Python interface while annoyED uses Kafka topics which leads to networking overhead.
We implemented an approximate nearest neighbor search prototype for data streams with Kafka Streams. Our solution is based on the Annoy algorithm [1, 2] and adopts its key ideas. As opposed to Annoy, annoyED does not implement separate model training and inference phases but its indices can be extended and queried continuously. Therefore, it is especially suitable when nearest neighbor search is realized on volatile data, e.g. data streams. AnnoyED works reasonably well on a number of different datasets and supports multiple distance measures. However, its performance is not on par with the highly optimized Annoy library.
Implementing annoyED, Kafka Streams turned out to be a suitable framework. While we needed to develop a custom state stores and processor, the Kafka Streams API allowed a flexible architecture according to our requirements. However, we expect adding a reasonable windowing strategy, which is not yet included, to be an elaborate effort due to its complexity.
Current limitations to consider are the following: We only ran and evaluated annoyED in a non-distributed setting, thus adding a layer of parallelization by e.g. multiplying the number of index stores and running them on separate instances is a task for future work. Also, a custom windowing strategy to allow for old data to be forgotten and thus account for e.g. concept drifts is an important step before running annoyED in a production setting.
[1]: Bernhardsson, E. (2015). “Nearest Neighbor Methods And Vector Models: Part 1&2” (Blogpost). https://erikbern.com/2015/09/24/nearest-neighbor-methods-vector-models-part-1, https://erikbern.com/2015/10/01/nearest-neighbors-and-vector-models-part-2-how-to-search-in-high-dimensional-spaces.html.
[2]: Spotify Inc. (since 2013). “Annoy: Approximate Nearest Neighbors Oh Yeah” (Repository). https://github.com/spotify/annoy.
[3]: Aumüller, M.; Bernhardsson, E.; Faithfull, A.J. (2018). “ANN-Benchmarks: A Benchmarking Tool for Approximate Nearest Neighbor Algorithms” (PDF). CoRR, abs/1807.05614. Also: http://ann-benchmarks.com/, https://github.com/erikbern/ann-benchmarks/.
[4]: LeCun, Y.; Cortes, C; Burges, CJ (2010). “MNIST handwritten digit database” (Online). ATT Labs. http://yann.lecun.com/exdb/mnist/.
[5]: Newman, D. (2008). “Bag of Words Data Set: NYTimes” (Online). University of California, Irvine. https://archive.ics.uci.edu/ml/datasets/bag+of+words.
[6]: Xiao, H.; Rasul, K; Vollgraf, R. (2017). “Fashion-MNIST: a Novel Image Dataset for Benchmarking Machine Learning Algorithms” (PDF). CoRR, abs/1708.07747. Also: https://github.com/zalandoresearch/fashion-mnist.
[7]: Arya, S.; Mount, D. M.; Netanyahu, N. S.; Silverman, R.; Wu, A. (1998). "An optimal algorithm for approximate nearest neighbor searching" (PDF). Journal of the ACM. 45 (6): 891–923.
[8]: Dasarathy, Belur V., ed. (1991). Nearest Neighbor (NN) Norms: NN Pattern Classification Techniques. ISBN 978-0-8186-8930-7.
[9]: Intel Corp. (2020). "Intel® Advanced Vector Extensions 512" (Online). https://www.intel.com/content/www/us/en/architecture-and-technology/avx-512-overview.html.