Skip to content

Commit

Permalink
Add support for scalar data to torch and numpy ANN backends, closes #587
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmezzetti committed Oct 27, 2023
1 parent df6b304 commit 7774fe7
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 12 deletions.
68 changes: 60 additions & 8 deletions src/python/txtai/ann/faiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import numpy as np

from faiss import index_factory, IO_FLAG_MMAP, METRIC_INNER_PRODUCT, read_index, write_index
from faiss import index_binary_factory, read_index_binary, write_index_binary, IndexBinaryIDMap

from .base import ANN

Expand All @@ -16,9 +17,19 @@ class Faiss(ANN):
Builds an ANN index using the Faiss library.
"""

def __init__(self, config):
super().__init__(config)

# Scalar quantization
quantize = self.config.get("quantize")
self.qbits = quantize if quantize and isinstance(quantize, int) and not isinstance(quantize, bool) else None

def load(self, path):
# Get read function
readindex = read_index_binary if self.qbits else read_index

# Load index
self.backend = read_index(path, IO_FLAG_MMAP if self.setting("mmap") is True else 0)
self.backend = readindex(path, IO_FLAG_MMAP if self.setting("mmap") is True else 0)

def index(self, embeddings):
# Compute model training size
Expand All @@ -31,7 +42,9 @@ def index(self, embeddings):

# Configure embeddings index. Inner product is equal to cosine similarity on normalized vectors.
params = self.configure(embeddings.shape[0], train.shape[0])
self.backend = index_factory(embeddings.shape[1], params, METRIC_INNER_PRODUCT)

# Create index
self.backend = self.create(embeddings, params)

# Train model
self.backend.train(train)
Expand All @@ -58,23 +71,33 @@ def delete(self, ids):
self.backend.remove_ids(np.array(ids, dtype=np.int64))

def search(self, queries, limit):
# Run the query
# Set nprobe and nflip search parameters
self.backend.nprobe = self.nprobe()
self.backend.nflip = self.setting("nflip", self.backend.nprobe)

# Run the query
scores, ids = self.backend.search(queries, limit)

# Map results to [(id, score)]
results = []
for x, score in enumerate(scores):
results.append(list(zip(ids[x].tolist(), score.tolist())))
# Transform scores
score = [1.0 - (x / (self.config["dimensions"] * 8)) for x in score.tolist()] if self.qbits else score.tolist()

# Add results
results.append(list(zip(ids[x].tolist(), score)))

return results

def count(self):
return self.backend.ntotal

def save(self, path):
# Get write function
writeindex = write_index_binary if self.qbits else write_index

# Write index
write_index(self.backend, path)
writeindex(self.backend, path)

def configure(self, count, train):
"""
Expand All @@ -94,18 +117,47 @@ def configure(self, count, train):
if components:
return components

# Derive quantization. Prefer backend-specific setting. Fallback to root-level parameter.
quantize = self.setting("quantize", self.config.get("quantize"))
quantize = 8 if isinstance(quantize, bool) else quantize

# Get storage setting
storage = "SQ8" if self.setting("quantize", self.config.get("quantize")) else "Flat"
storage = f"SQ{quantize}" if quantize else "Flat"

# Small index, use storage directly with IDMap
if count <= 5000:
return f"IDMap,{storage}"
return "BFlat" if self.qbits else f"IDMap,{storage}"

x = self.cells(train)
components = f"IVF{x},{storage}"
components = f"BIVF{x}" if self.qbits else f"IVF{x},{storage}"

return components

def create(self, embeddings, params):
"""
Creates a new index.
Args:
embeddings: embeddings to index
params: index parameters
Returns:
new index
"""

# Create binary index
if self.qbits:
index = index_binary_factory(embeddings.shape[1] * 8, params)

# Wrap with BinaryIDMap, if necessary
if any(x in params for x in ["BFlat", "BHNSW"]):
index = IndexBinaryIDMap(index)

return index

# Create standard float index
return index_factory(embeddings.shape[1], params, METRIC_INNER_PRODUCT)

def cells(self, count):
"""
Calculates the number of IVF cells for an IVF index.
Expand Down
67 changes: 63 additions & 4 deletions src/python/txtai/ann/numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ def __init__(self, config):

# Array function definitions
self.all, self.cat, self.dot, self.zeros = np.all, np.concatenate, np.dot, np.zeros
self.argsort, self.xor = np.argsort, np.bitwise_xor

# Scalar quantization
quantize = self.config.get("quantize")
self.qbits = quantize if quantize and isinstance(quantize, int) and not isinstance(quantize, bool) else None

def load(self, path):
# Load array from file
Expand Down Expand Up @@ -53,11 +58,23 @@ def delete(self, ids):
self.backend[ids] = self.tensor(self.zeros((len(ids), self.backend.shape[1])))

def search(self, queries, limit):
# Dot product on normalized vectors is equal to cosine similarity
scores = self.dot(self.tensor(queries), self.backend.T).tolist()
if self.qbits:
# Calculate hamming score for integer vectors
scores = self.hammingscore(queries)
else:
# Dot product on normalized vectors is equal to cosine similarity
scores = self.dot(self.tensor(queries), self.backend.T)

# Get topn ids
ids = self.argsort(-scores)[:, :limit]

# Map results to [(id, score)]
results = []
for x, score in enumerate(scores):
# Add results
results.append(list(zip(ids[x].tolist(), score[ids[x]].tolist())))

# Add index and sort desc based on score
return [sorted(enumerate(score), key=lambda x: x[1], reverse=True)[:limit] for score in scores]
return results

def count(self):
# Get count of non-zero rows (ignores deleted rows)
Expand All @@ -81,6 +98,20 @@ def tensor(self, array):

return array

def totype(self, array, dtype):
"""
Casts array to dtype.
Args:
array: input array
dtype: dtype
Returns:
array cast as dtype
"""

return np.int64(array) if dtype == np.int64 else array

def settings(self):
"""
Returns settings for this array.
Expand All @@ -90,3 +121,31 @@ def settings(self):
"""

return {"numpy": np.__version__}

def hammingscore(self, queries):
"""
Calculates a hamming distance score.
This is defined as:
score = 1.0 - (hamming distance / total number of bits)
Args:
queries: queries array
Returns:
scores
"""

# Build table of number of bits for each distinct uint8 value
table = 1 << np.arange(8)
table = self.tensor(np.array([np.count_nonzero(x & table) for x in np.arange(256)]))

# Number of different bits
delta = self.xor(self.tensor(queries[:, None]), self.backend)

# Cast to long array
delta = self.totype(delta, np.int64)

# Calculate score as 1.0 - percentage of different bits
return 1.0 - (table[delta].sum(axis=2) / (self.config["dimensions"] * 8))
4 changes: 4 additions & 0 deletions src/python/txtai/ann/torch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def __init__(self, config):

# Define array functions
self.all, self.cat, self.dot, self.zeros = torch.all, torch.cat, torch.mm, torch.zeros
self.argsort, self.xor = torch.argsort, torch.bitwise_xor

def tensor(self, array):
# Convert array to Tensor
Expand All @@ -29,5 +30,8 @@ def tensor(self, array):
# Load to GPU device, if available
return array.cuda() if torch.cuda.is_available() else array

def totype(self, array, dtype):
return array.long() if dtype == np.int64 else array

def settings(self):
return {"torch": torch.__version__}

0 comments on commit 7774fe7

Please sign in to comment.