Skip to content

Commit

Permalink
Linscan in the new folder structure (FALCONN-LIB#117)
Browse files Browse the repository at this point in the history
* move to new structure

* spmat config

* fix spmat

* fix linscan config

* linscan config for sparse-small

* linscan config for 1M

* linscan config for sparse-full
  • Loading branch information
ingberam authored Jun 4, 2023
1 parent 61b64ce commit 04e8a5e
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 70 deletions.
22 changes: 19 additions & 3 deletions neurips23/sparse/linscan/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
FROM neurips23

RUN apt-get update
RUN apt-get install -y wget git cmake g++ libaio-dev libgoogle-perftools-dev clang-format libboost-dev python3 python3-setuptools python3-pip
RUN apt-get install -y curl

RUN pip3 install scipy tqdm
# install rust + build tools
RUN curl https://sh.rustup.rs -sSf | sh -s -- -y
ENV PATH="/root/.cargo/bin:${PATH}"
RUN git clone --single-branch --branch main https://github.com/pinecone-io/research-bigann-linscan
WORKDIR research-bigann-linscan/

# install maturin (build tool for rust-python)
RUN pip install maturin

# build a whl file
RUN maturin build -r

# pip install the correct wheel (different architectures will produce .whl files with different names)
RUN pip install ./target/wheels/*.whl

# verify that the build worked
RUN python3 -c 'import pylinscan; print(pylinscan.LinscanIndex());'

WORKDIR ..
29 changes: 27 additions & 2 deletions neurips23/sparse/linscan/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,36 @@ sparse-small:
linscan:
docker-tag: neurips23-sparse-linscan
module: neurips23.sparse.linscan.linscan
constructor: SparseIndex
constructor: Linscan
base-args: ["@metric"]
run-groups:
base:
args: |
[{}]
query-args: |
[{"alpha":0.5}]
[{"budget":1},{"budget":0.5},{"budget":0.4},{"budget":0.3},{"budget":0.25},{"budget":0.2},{"budget":0.15},{"budget":0.1},{"budget":0.075},{"budget":0.05}]
sparse-1M:
linscan:
docker-tag: neurips23-sparse-linscan
module: neurips23.sparse.linscan.linscan
constructor: Linscan
base-args: ["@metric"]
run-groups:
base:
args: |
[{}]
query-args: |
[{"budget":0.5},{"budget":1},{"budget":2},{"budget":4},{"budget":5},{"budget":6},{"budget":7},{"budget":8},{"budget":10}]
sparse-full:
linscan:
docker-tag: neurips23-sparse-linscan
module: neurips23.sparse.linscan.linscan
constructor: Linscan
base-args: ["@metric"]
run-groups:
base:
args: |
[{}]
query-args: |
[{"budget":5},{"budget":10},{"budget":20},{"budget":30},{"budget":40},{"budget":50},{"budget":60},{"budget":90},{"budget":150}]
103 changes: 38 additions & 65 deletions neurips23/sparse/linscan/linscan.py
Original file line number Diff line number Diff line change
@@ -1,86 +1,59 @@
from __future__ import absolute_import

from scipy.sparse import csr_matrix
import numpy as np

from multiprocessing.pool import ThreadPool
from benchmark.algorithms.base import BaseANN
from benchmark.datasets import DATASETS
import pylinscan

from neurips23.sparse.base import BaseSparseANN
from benchmark.datasets import DATASETS, download_accelerated
# a python wrapper for the linscan algorithm, implemented in rust
# algorithm details: https://arxiv.org/abs/2301.10622
# code: https://github.com/pinecone-io/research-bigann-linscan

# given a vector x, returns another vector with the minimal number of largest elements of x,
# s.t. their sum is at most a times the sum of the elements in x.
#
# The goal is to sparsify the vector further,
# but at the same time try and preserve as much of the original vector as possible.
def largest_elements(x, a):
# Compute the sum of elements of x
x_sum = np.sum(x)
# Build parameters: none
# Query parameters: budget (in ms) for computing all the scores
class Linscan(BaseANN):
def __init__(self, metric, index_params):
assert metric == "ip"
self.name = "linscan"
self._index = pylinscan.LinscanIndex()
self._budget = np.infty
print("Linscan index initialized: " + str(self._index))

# Compute the indices and values of the largest elements of x
ind = np.argsort(-x.data)
cs = np.cumsum(x.data[ind] / x_sum)
def fit(self, dataset): # e.g. dataset = "sparse-small"

n_elements = min(sum(cs < a) + 1, x.nnz) # rounding errors sometimes results in n_elements > x.nnz
self.ds = DATASETS[dataset]()
assert self.ds.data_type() == "sparse"

new_ind = x.indices[ind[:n_elements]]
new_data = x.data[ind[:n_elements]]
return csr_matrix((new_data, new_ind, [0, n_elements]), shape=x.shape)
N_VEC_LIMIT = 100000 # batch size
it = self.ds.get_dataset_iterator(N_VEC_LIMIT)
for d in it:
for i in range(d.shape[0]):
d1 = d.getrow(i)
self._index.insert(dict(zip(d1.indices, d1.data)))

print("Index status: " + str(self._index))


# a basic sparse index.
# methods:
# 1. init: from a csr matrix of data.
# 2. query a singe vector, with parameters:
# - k (# of neighbors),
# - alpha (fraction of the sum of the vector to maintain. alpha=1 is exact search).
class SparseIndex(BaseSparseANN):
def __init__(self, metric, index_params):
print(metric, index_params)
self.name = "linsparse"

def fit(self, dataset):
self.ds = DATASETS[dataset]()
self.data_csc = self.ds.get_dataset().tocsc()

def load_index(self, dataset):
return None

def set_query_arguments(self, query_args):
self._alpha = query_args["alpha"]

def _process_single_row(self, i):
res = self._single_query(q=self.queries.getrow(i))
self.I[i, :] = [rr[0] for rr in res]
self._budget = query_args["budget"]


def _single_query(self, q, k=10):
if self._alpha == 1:
q = q.transpose()
else:
q = largest_elements(q, self._alpha).transpose()
# perform (sparse) matrix-vector multiplication
res = self.data_csc.dot(q)

if res.nnz <= k: # if there are less than k elements with nonzero score, simply return them
return list(zip(res.indices, res.data))
# extract the top k from the res sparse array directly
indices = np.argpartition(res.data, -(k + 1))[-k:]
results = []
for index in indices:
results.append((res.data[index], index))
results.sort(reverse=True)
return [(res.indices[b], a) for a, b in results]

def query(self, X, k): # single query, assumes q is a row vector
def query(self, X, k):
"""Carry out a batch query for k-NN of query set X."""
nq = X.shape[0]
self.I = -np.ones((nq, k), dtype='int32')
self.queries = X

with ThreadPool() as pool:
list(pool.imap(self._process_single_row, range(nq)))
# prepare the queries as a list of dicts
self.queries = []
for i in range(nq):
qc = X.getrow(i)
q = dict(zip(qc.indices, qc.data))
self.queries.append(q)

res = self._index.retrieve_parallel(self.queries, k, self._budget)
self.I = np.array(res, dtype='int32')

def get_results(self):
return self.I


7 changes: 7 additions & 0 deletions neurips23/sparse/spmat/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM neurips23

RUN apt-get update
RUN apt-get install -y wget git cmake g++ libaio-dev libgoogle-perftools-dev clang-format libboost-dev python3 python3-setuptools python3-pip

RUN pip3 install scipy

12 changes: 12 additions & 0 deletions neurips23/sparse/spmat/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
sparse-small:
spmat:
docker-tag: neurips23-sparse-spmat
module: neurips23.sparse.spmat.spmat
constructor: SparseMatMul
base-args: ["@metric"]
run-groups:
base:
args: |
[{"threads": 8}]
query-args: |
[{"alpha":0.5}, {"alpha":0.6}, {"alpha":0.7}, {"alpha":0.8}, {"alpha":0.9}, {"alpha":0.92}, {"alpha":0.94}, {"alpha":0.96}, {"alpha":0.98}, {"alpha":1.0}]
93 changes: 93 additions & 0 deletions neurips23/sparse/spmat/spmat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from __future__ import absolute_import

from scipy.sparse import csr_matrix
import numpy as np

from multiprocessing.pool import ThreadPool

from benchmark.algorithms.base import BaseANN
from benchmark.datasets import DATASETS

# given a vector x, returns another vector with the minimal number of largest elements of x,
# s.t. their sum is at most a times the sum of the elements in x.
#
# The goal is to sparsify the vector further,
# but at the same time try and preserve as much of the original vector as possible.
def largest_elements(x, a):
# Compute the sum of elements of x
x_sum = np.sum(x)

# Compute the indices and values of the largest elements of x
ind = np.argsort(-x.data)
cs = np.cumsum(x.data[ind] / x_sum)

n_elements = min(sum(cs < a) + 1, x.nnz) # rounding errors sometimes results in n_elements > x.nnz

new_ind = x.indices[ind[:n_elements]]
new_data = x.data[ind[:n_elements]]
return csr_matrix((new_data, new_ind, [0, n_elements]), shape=x.shape)


# a basic sparse index based on sparse matrix multiplication
# methods:
# 1. init: from a csr matrix of data.
# 2. query:
# - k (# of neighbors),
# - alpha (fraction of the sum of the vector elements to maintain. alpha=1 is exact search).
class SparseMatMul(BaseANN):
def __init__(self, metric, index_params):
print(metric, index_params)
self.name = "spmat"
self.nt = index_params.get("threads", 1)

def fit(self, dataset):
self.ds = DATASETS[dataset]()
self.data_csc = self.ds.get_dataset().tocsc()

def load_index(self, dataset):
return None

def set_query_arguments(self, query_args):
self._alpha = query_args["alpha"]

def _process_single_row(self, i):
res = self._single_query(q=self.queries.getrow(i))
self.I[i, :] = [rr[0] for rr in res]


def _single_query(self, q, k=10):
if self._alpha == 1:
q = q.transpose()
else:
q = largest_elements(q, self._alpha).transpose()
# perform (sparse) matrix-vector multiplication
res = self.data_csc.dot(q)

if res.nnz <= k: # if there are less than k elements with nonzero score, simply return them
return list(zip(res.indices, res.data))
# extract the top k from the res sparse array directly
indices = np.argpartition(res.data, -(k + 1))[-k:]
results = []
for index in indices:
results.append((res.data[index], index))
results.sort(reverse=True)
return [(res.indices[b], a) for a, b in results]

def query(self, X, k): # Carry out a batch query for k-NN of query set X.

nq = X.shape[0]
self.I = -np.ones((nq, k), dtype='int32')
self.queries = X

if self.nt is None:
for i in range(nq):
self._process_single_row(i)
else:
with ThreadPool(processes=self.nt) as pool:
# Map the function to the array of items
list(pool.imap(self._process_single_row, range(nq)))

def get_results(self):
return self.I


0 comments on commit 04e8a5e

Please sign in to comment.